aboutsummaryrefslogtreecommitdiffstats
path: root/src/protocol.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol.rs')
-rw-r--r--src/protocol.rs88
1 files changed, 45 insertions, 43 deletions
diff --git a/src/protocol.rs b/src/protocol.rs
index 7a64f88..aaad914 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -31,37 +31,36 @@ pub enum DatNetMessage {
fn msg_code(msg: &DatNetMessage) -> u8 {
match msg {
- &DatNetMessage::Register(_) => 0,
+ &DatNetMessage::Register(_) => 0,
&DatNetMessage::Handshake(_) => 1,
- &DatNetMessage::Status(_) => 2,
- &DatNetMessage::Have(_) => 3,
- &DatNetMessage::Unhave(_) => 4,
- &DatNetMessage::Want(_) => 5,
- &DatNetMessage::Unwant(_) => 6,
- &DatNetMessage::Request(_) => 7,
- &DatNetMessage::Cancel(_) => 8,
- &DatNetMessage::Data(_) => 9,
+ &DatNetMessage::Status(_) => 2,
+ &DatNetMessage::Have(_) => 3,
+ &DatNetMessage::Unhave(_) => 4,
+ &DatNetMessage::Want(_) => 5,
+ &DatNetMessage::Unwant(_) => 6,
+ &DatNetMessage::Request(_) => 7,
+ &DatNetMessage::Cancel(_) => 8,
+ &DatNetMessage::Data(_) => 9,
}
}
fn msg_sugar(msg: &DatNetMessage) -> &Message {
match msg {
- &DatNetMessage::Register(ref m) => m,
+ &DatNetMessage::Register(ref m) => m,
&DatNetMessage::Handshake(ref m) => m,
- &DatNetMessage::Status(ref m) => m,
- &DatNetMessage::Have(ref m) => m,
- &DatNetMessage::Unhave(ref m) => m,
- &DatNetMessage::Want(ref m) => m,
- &DatNetMessage::Unwant(ref m) => m,
- &DatNetMessage::Request(ref m) => m,
- &DatNetMessage::Cancel(ref m) => m,
- &DatNetMessage::Data(ref m) => m,
+ &DatNetMessage::Status(ref m) => m,
+ &DatNetMessage::Have(ref m) => m,
+ &DatNetMessage::Unhave(ref m) => m,
+ &DatNetMessage::Want(ref m) => m,
+ &DatNetMessage::Unwant(ref m) => m,
+ &DatNetMessage::Request(ref m) => m,
+ &DatNetMessage::Cancel(ref m) => m,
+ &DatNetMessage::Data(ref m) => m,
}
}
/// This helper is pretty slow/inefficient; lots of copying memory
fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) {
-
let mut offset = byte_offset;
// We may have a partial-64-byte-block to finish encrypting first
@@ -70,26 +69,25 @@ fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonc
if partial_len != 0 {
let mut partial = vec![0; 64];
for i in 0..partial_len {
- partial[partial_offset+i] = buf[i];
+ partial[partial_offset + i] = buf[i];
}
let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key);
offset += partial_len as u64;
for i in 0..partial_len {
- buf[i] = partial_enc[partial_offset+i];
+ buf[i] = partial_enc[partial_offset + i];
}
}
if buf.len() > partial_len {
let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key);
//offset += main_enc.len() as u64;
for i in 0..main_enc.len() {
- buf[partial_len+i] = main_enc[i];
+ buf[partial_len + i] = main_enc[i];
}
}
}
#[test]
fn test_bsxii_short() {
-
let nonce = gen_nonce();
let key = gen_key();
@@ -110,7 +108,6 @@ fn test_bsxii_short() {
#[test]
fn test_bsxii_continued() {
-
let nonce = gen_nonce();
let key = gen_key();
@@ -161,7 +158,6 @@ pub struct DatConnection {
}
impl Read for DatConnection {
-
/// Encrypted TCP read (after connection initialized). Uses XOR of an XSalsa20 stream, using
/// block offsets.
fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result<usize> {
@@ -174,10 +170,8 @@ impl Read for DatConnection {
}
impl Write for DatConnection {
-
/// Encrypted write to complement `read()`.
fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> {
-
// Don't mutate what we've been passed
let mut enc = vec![0; buf.len()];
enc.copy_from_slice(buf);
@@ -194,9 +188,7 @@ impl Write for DatConnection {
}
impl DatConnection {
-
pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> {
-
let timeout = Duration::new(7, 0);
let tx_nonce = gen_nonce();
let mut local_id = [0; 32];
@@ -224,13 +216,12 @@ impl DatConnection {
data_discovery_key: [0; 32],
tx_nonce: tx_nonce,
tx_offset: 0,
- rx_nonce: gen_nonce(), // dummy
+ rx_nonce: gen_nonce(), // dummy
rx_offset: 0,
};
// Exchange register/feed
dc.tcp.set_nodelay(true)?; // Faster handshake
- // send register
let mut register_msg = Feed::new();
register_msg.set_discoveryKey(dc.discovery_key.to_vec());
register_msg.set_nonce((tx_nonce[0..24]).to_vec());
@@ -281,13 +272,17 @@ impl DatConnection {
}
fn send_msg(&mut self, dnm: &DatNetMessage, is_content: bool) -> Result<()> {
-
let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F);
let msg: &Message = msg_sugar(dnm);
let total_message_size = (msg.compute_size() as usize) + 1;
- trace!("SEND total_len={} header={} is_content={} type={:?}",
- total_message_size, header_int, is_content, &dnm);
+ trace!(
+ "SEND total_len={} header={} is_content={} type={:?}",
+ total_message_size,
+ header_int,
+ is_content,
+ &dnm
+ );
// send both header varints, and data
self.write_varint(total_message_size as u64)?;
@@ -309,13 +304,17 @@ impl DatConnection {
}
fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> {
-
let total_len: u64 = self.read_varint()?;
let header: u8 = self.read_varint()?;
let is_content = (header & (1 << 4)) != 0;
- trace!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
+ trace!(
+ "RECV total_len={} header={} is_content={}",
+ total_len,
+ header,
+ is_content
+ );
if header > 0x1F {
bail!("Invalid header received: {}", header);
@@ -351,8 +350,12 @@ impl DatConnection {
let header_int: u8 = 0;
let total_message_size = (reg.compute_size() as usize) + 1;
- trace!("SEND total_len={} header={} msg={:?}",
- total_message_size, header_int, reg);
+ trace!(
+ "SEND total_len={} header={} msg={:?}",
+ total_message_size,
+ header_int,
+ reg
+ );
self.tcp.write_varint(total_message_size as u64)?;
self.tcp.write_varint(header_int as u32)?;
@@ -383,7 +386,6 @@ impl DatConnection {
}
pub fn get_data_key(&mut self) -> Result<()> {
-
// Status: downloading, not uploading
let mut sm = Info::new();
sm.set_uploading(false);
@@ -419,7 +421,7 @@ impl DatConnection {
info!("Expected Have message, got: {:?}", &msg);
continue;
}
- };
+ }
// Request
let mut rm = Request::new();
@@ -443,7 +445,8 @@ impl DatConnection {
}
info!("Got data discovery key");
self.data_key.copy_from_slice(&data_key[0..32]);
- self.data_discovery_key.copy_from_slice(&make_discovery_key(data_key)[0..32]);
+ self.data_discovery_key
+ .copy_from_slice(&make_discovery_key(data_key)[0..32]);
return Ok(());
} else {
unimplemented!("non-hyperdrive Index type: {}", index_msg.get_field_type());
@@ -457,7 +460,6 @@ impl DatConnection {
}
pub fn receive_all(&mut self, is_content: bool, length: u64) -> Result<()> {
-
// Status: downloading, not uploading
let mut sm = Info::new();
sm.set_uploading(false);
@@ -488,7 +490,7 @@ impl DatConnection {
loop {
let (was_content, msg) = self.recv_msg()?;
- if was_content != is_content{
+ if was_content != is_content {
info!("Expected other message channel");
continue;
}