aboutsummaryrefslogtreecommitdiffstats
path: root/src/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync.rs')
-rw-r--r--src/sync.rs129
1 files changed, 116 insertions, 13 deletions
diff --git a/src/sync.rs b/src/sync.rs
index 2869075..d3f54a5 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -2,6 +2,7 @@
use std::net::TcpStream;
use std::time::Duration;
use std::io::{Read, Write};
+use std::cmp;
use crypto::digest::Digest;
use crypto::blake2b::Blake2b;
use sodiumoxide::crypto::stream::*;
@@ -57,6 +58,80 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message {
}
}
+/// This helper is pretty slow/inefficient; lots of copying memory
+fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) {
+
+ let mut offset = byte_offset;
+
+ // We may have a partial-64-byte-block to finish encrypting first
+ let partial_offset: usize = (offset % 64) as usize;
+ let partial_len: usize = cmp::min(64 - partial_offset, buf.len());
+ if partial_len != 0 {
+ let mut partial = vec![0; 64];
+ for i in 0..partial_len {
+ partial[partial_offset+i] = buf[i];
+ }
+ let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key);
+ offset += partial_len as u64;
+ for i in 0..partial_len {
+ buf[i] = partial_enc[partial_offset+i];
+ }
+ }
+ if buf.len() > partial_len {
+ let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key);
+ //offset += main_enc.len() as u64;
+ for i in 0..main_enc.len() {
+ buf[partial_len+i] = main_enc[i];
+ }
+ }
+}
+
+#[test]
+fn test_bsxii_short() {
+
+ let nonce = gen_nonce();
+ let key = gen_key();
+
+ for size in [10, 100, 1234].iter() {
+ let mut a = vec![7; *size];
+ let mut b = vec![7; *size];
+ let c = vec![7; *size];
+
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut b, 0, &nonce, &key);
+ assert_eq!(a, b);
+ assert_ne!(a, c);
+ bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key);
+ assert_eq!(a, c);
+ }
+}
+
+#[test]
+fn test_bsxii_continued() {
+
+ let nonce = gen_nonce();
+ let key = gen_key();
+
+ let mut a = vec![7; 1234];
+ let mut b = vec![7; 1234];
+ let c = vec![7; 1234];
+
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a[0..10], 0, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut a[10..20], 10, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut b[0..20], 0, &nonce, &key);
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a[20..50], 20, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut a[50..500], 50, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut b[20..500], 20, &nonce, &key);
+ assert_ne!(a, c);
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a[500..1234], 500, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key);
+ assert_eq!(a, c);
+}
+
pub struct TcpSodiumReader<'a> {
dc: &'a mut DatConnection,
}
@@ -65,10 +140,9 @@ impl<'a> Read for TcpSodiumReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result<usize> {
let len = self.dc.tcp.read(buf)?;
- if len > 0 {
- stream_xor_inplace(&mut buf[0..len], &self.dc.rx_nonce, &self.dc.key);
- }
+ bytewise_stream_xor_ic_inplace(&mut buf[0..len], self.dc.rx_offset, &self.dc.rx_nonce, &self.dc.key);
self.dc.rx_offset += len as u64;
+
Ok(len)
}
}
@@ -80,9 +154,14 @@ pub struct TcpSodiumWriter<'a> {
impl<'a> Write for TcpSodiumWriter<'a> {
fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> {
- //let enc = stream_xor_ic(buf, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key);
- let enc = stream_xor(buf, &self.dc.tx_nonce, &self.dc.key);
- self.dc.tx_offset += buf.len() as u64;
+
+ // Don't mutate what we've been passed
+ let mut enc = vec![0; buf.len()];
+ enc.copy_from_slice(buf);
+
+ bytewise_stream_xor_ic_inplace(&mut enc, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key);
+ self.dc.tx_offset += enc.len() as u64;
+
self.dc.tcp.write(&enc)
}
@@ -117,7 +196,7 @@ impl DatConnection {
rng.fill_bytes(&mut local_id);
// Connect to server
- println!("Connecting to {}", host_port);
+ info!("Connecting to {}", host_port);
// TODO: timeout on connect (socketaddr dance)
let tcp = TcpStream::connect(host_port)?;
tcp.set_read_timeout(Some(timeout))?;
@@ -189,7 +268,8 @@ impl DatConnection {
let msg: &Message = msg_sugar(dnm);
let total_message_size = (msg.compute_size() as usize) + 1;
- println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content);
+ trace!("SEND total_len={} header={} is_content={} type={:?}",
+ total_message_size, header_int, is_content, &dnm);
// send both header varints, and data
tx_stream.write_varint(total_message_size as u64)?;
@@ -218,7 +298,7 @@ impl DatConnection {
let is_content = (header & (1 << 4)) != 0;
- println!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
+ trace!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
if header > 0x1F {
bail!("Invalid header received: {}", header);
@@ -241,15 +321,21 @@ impl DatConnection {
9 => DatNetMessage::Data(parse_from_bytes::<Data>(&mut buf)?),
other => bail!("Unimplemented message type received: {}", other),
};
+ trace!("\twas: {:?}", &dnm);
Ok((is_content, dnm))
}
+ /// Special unencrypted variant of `send_msg()`, used only during initial connection
+ /// establishment (eg, to check metadata discovery key and exchange nonces). After the
+ /// connection is initialized, send Register (aka Feed) messages as normal to add extra feeds.
fn send_register(&mut self, reg: &Feed) -> Result<()> {
+ // TODO: refactor this to take discovery key and nonce directly
let header_int: u8 = 0;
let total_message_size = (reg.compute_size() as usize) + 1;
- println!("SEND total_len={} header={}", total_message_size, header_int);
+ trace!("SEND total_len={} header={} msg={:?}",
+ total_message_size, header_int, reg);
self.tcp.write_varint(total_message_size as u64)?;
self.tcp.write_varint(header_int as u32)?;
@@ -257,7 +343,9 @@ impl DatConnection {
Ok(())
}
+ /// Receive complement to `send_register()`.
fn recv_register(&mut self) -> Result<Feed> {
+ // TODO: refactor this to return discovery key and nonce directly
let total_len: u64 = self.tcp.read_varint()?;
let header: u8 = self.tcp.read_varint()?;
@@ -266,13 +354,14 @@ impl DatConnection {
bail!("Invalid register header received");
}
- println!("RECV total_len={} header={}", total_len, header);
+ trace!("RECV total_len={} header={}", total_len, header);
let msg_len = (total_len - 1) as usize;
let mut buf = vec![0; msg_len];
self.tcp.read_exact(&mut buf[0..msg_len])?;
let reg = parse_from_bytes::<Feed>(&mut buf)?;
+ trace!("\twas: {:?}", reg);
Ok(reg)
}
@@ -284,6 +373,17 @@ impl DatConnection {
sm.set_downloading(true);
self.send_msg(&DatNetMessage::Status(sm), is_content)?;
+ // Have: nothing (so far)
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_length(0);
+ self.send_msg(&DatNetMessage::Have(hm), is_content)?;
+
+ // UnHave: still nothing
+ let mut uhm = Unhave::new();
+ uhm.set_start(0);
+ self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?;
+
// Want: everything
let mut wm = Want::new();
wm.set_start(0);
@@ -300,7 +400,7 @@ impl DatConnection {
length = have.get_length();
break;
} else {
- info!("Expected Want message, got: {:?}", &msg);
+ info!("Expected Have message, got: {:?}", &msg);
continue;
}
};
@@ -316,7 +416,10 @@ impl DatConnection {
info!("Expected other message channel");
}
if let DatNetMessage::Data(dm) = msg {
- println!("Got metadata: {}", dm.get_index());
+ info!("Got metadata: {}", dm.get_index());
+ // TODO: if this is index 0, and we're receive-all, and this is metadata feed, then
+ // this is the special Index data entry; we should parse, check type, and store as
+ // register key (?)
} else {
info!("Expected Data message, got: {:?}", &msg);
continue;