From e8e2bb6b41e5e19f8a433dcff597369af2218a92 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Oct 2017 00:17:01 -0700 Subject: work in progress on network connection --- src/bin/geniza-net.rs | 7 ++- src/bin/geniza-register.rs | 3 ++ src/bin/geniza-sleep.rs | 3 ++ src/sync.rs | 129 ++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 127 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs index de97e88..1f369ab 100644 --- a/src/bin/geniza-net.rs +++ b/src/bin/geniza-net.rs @@ -2,6 +2,7 @@ #[macro_use] extern crate error_chain; extern crate clap; +extern crate env_logger; extern crate geniza; // TODO: more careful import @@ -9,6 +10,8 @@ use geniza::*; use clap::{App, SubCommand}; fn run() -> Result<()> { + + env_logger::init().unwrap(); let matches = App::new("geniza-net") .version(env!("CARGO_PKG_VERSION")) @@ -16,7 +19,7 @@ fn run() -> Result<()> { .about("Connects to a peer and exchanges handshake") .arg_from_usage(" 'peer host:port to connect to'") .arg_from_usage(" 'dat key (public key) to register with'")) - .subcommand(SubCommand::with_name("clone") + .subcommand(SubCommand::with_name("receive-all") .about("Connects to a peer, pulls all metadata and content") .arg_from_usage(" 'peer host:port to connect to'") .arg_from_usage(" 'dat key (public key) to register with'")) @@ -44,7 +47,7 @@ fn run() -> Result<()> { false)?; println!("Done!"); }, - ("clone", Some(subm)) => { + ("receive-all", Some(subm)) => { let host_port = subm.value_of("host_port").unwrap(); let dat_key = subm.value_of("dat_key").unwrap(); if dat_key.len() != 32*2 { diff --git a/src/bin/geniza-register.rs b/src/bin/geniza-register.rs index d0f48d8..a328bbc 100644 --- a/src/bin/geniza-register.rs +++ b/src/bin/geniza-register.rs @@ -2,6 +2,7 @@ #[macro_use] extern crate error_chain; extern crate clap; +extern crate env_logger; extern crate geniza; // TODO: more careful import @@ -10,6 +11,8 @@ use std::path::Path; use clap::{App, SubCommand}; fn run() -> Result<()> { + + env_logger::init().unwrap(); let matches = App::new("geniza-register") .version(env!("CARGO_PKG_VERSION")) diff --git a/src/bin/geniza-sleep.rs b/src/bin/geniza-sleep.rs index f8921c7..10fc16b 100644 --- a/src/bin/geniza-sleep.rs +++ b/src/bin/geniza-sleep.rs @@ -3,6 +3,7 @@ extern crate error_chain; #[macro_use] extern crate clap; +extern crate env_logger; extern crate geniza; // TODO: more careful import @@ -11,6 +12,8 @@ use std::path::Path; use clap::{App, SubCommand}; fn run() -> Result<()> { + + env_logger::init().unwrap(); let matches = App::new("geniza-sleep") .version(env!("CARGO_PKG_VERSION")) diff --git a/src/sync.rs b/src/sync.rs index 2869075..d3f54a5 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -2,6 +2,7 @@ use std::net::TcpStream; use std::time::Duration; use std::io::{Read, Write}; +use std::cmp; use crypto::digest::Digest; use crypto::blake2b::Blake2b; use sodiumoxide::crypto::stream::*; @@ -57,6 +58,80 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message { } } +/// This helper is pretty slow/inefficient; lots of copying memory +fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) { + + let mut offset = byte_offset; + + // We may have a partial-64-byte-block to finish encrypting first + let partial_offset: usize = (offset % 64) as usize; + let partial_len: usize = cmp::min(64 - partial_offset, buf.len()); + if partial_len != 0 { + let mut partial = vec![0; 64]; + for i in 0..partial_len { + partial[partial_offset+i] = buf[i]; + } + let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key); + offset += partial_len as u64; + for i in 0..partial_len { + buf[i] = partial_enc[partial_offset+i]; + } + } + if buf.len() > partial_len { + let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key); + //offset += main_enc.len() as u64; + for i in 0..main_enc.len() { + buf[partial_len+i] = main_enc[i]; + } + } +} + +#[test] +fn test_bsxii_short() { + + let nonce = gen_nonce(); + let key = gen_key(); + + for size in [10, 100, 1234].iter() { + let mut a = vec![7; *size]; + let mut b = vec![7; *size]; + let c = vec![7; *size]; + + assert_eq!(a, b); + bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key); + bytewise_stream_xor_ic_inplace(&mut b, 0, &nonce, &key); + assert_eq!(a, b); + assert_ne!(a, c); + bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key); + assert_eq!(a, c); + } +} + +#[test] +fn test_bsxii_continued() { + + let nonce = gen_nonce(); + let key = gen_key(); + + let mut a = vec![7; 1234]; + let mut b = vec![7; 1234]; + let c = vec![7; 1234]; + + assert_eq!(a, b); + bytewise_stream_xor_ic_inplace(&mut a[0..10], 0, &nonce, &key); + bytewise_stream_xor_ic_inplace(&mut a[10..20], 10, &nonce, &key); + bytewise_stream_xor_ic_inplace(&mut b[0..20], 0, &nonce, &key); + assert_eq!(a, b); + bytewise_stream_xor_ic_inplace(&mut a[20..50], 20, &nonce, &key); + bytewise_stream_xor_ic_inplace(&mut a[50..500], 50, &nonce, &key); + bytewise_stream_xor_ic_inplace(&mut b[20..500], 20, &nonce, &key); + assert_ne!(a, c); + assert_eq!(a, b); + bytewise_stream_xor_ic_inplace(&mut a[500..1234], 500, &nonce, &key); + bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key); + assert_eq!(a, c); +} + pub struct TcpSodiumReader<'a> { dc: &'a mut DatConnection, } @@ -65,10 +140,9 @@ impl<'a> Read for TcpSodiumReader<'a> { fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result { let len = self.dc.tcp.read(buf)?; - if len > 0 { - stream_xor_inplace(&mut buf[0..len], &self.dc.rx_nonce, &self.dc.key); - } + bytewise_stream_xor_ic_inplace(&mut buf[0..len], self.dc.rx_offset, &self.dc.rx_nonce, &self.dc.key); self.dc.rx_offset += len as u64; + Ok(len) } } @@ -80,9 +154,14 @@ pub struct TcpSodiumWriter<'a> { impl<'a> Write for TcpSodiumWriter<'a> { fn write(&mut self, buf: &[u8]) -> ::std::io::Result { - //let enc = stream_xor_ic(buf, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key); - let enc = stream_xor(buf, &self.dc.tx_nonce, &self.dc.key); - self.dc.tx_offset += buf.len() as u64; + + // Don't mutate what we've been passed + let mut enc = vec![0; buf.len()]; + enc.copy_from_slice(buf); + + bytewise_stream_xor_ic_inplace(&mut enc, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key); + self.dc.tx_offset += enc.len() as u64; + self.dc.tcp.write(&enc) } @@ -117,7 +196,7 @@ impl DatConnection { rng.fill_bytes(&mut local_id); // Connect to server - println!("Connecting to {}", host_port); + info!("Connecting to {}", host_port); // TODO: timeout on connect (socketaddr dance) let tcp = TcpStream::connect(host_port)?; tcp.set_read_timeout(Some(timeout))?; @@ -189,7 +268,8 @@ impl DatConnection { let msg: &Message = msg_sugar(dnm); let total_message_size = (msg.compute_size() as usize) + 1; - println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content); + trace!("SEND total_len={} header={} is_content={} type={:?}", + total_message_size, header_int, is_content, &dnm); // send both header varints, and data tx_stream.write_varint(total_message_size as u64)?; @@ -218,7 +298,7 @@ impl DatConnection { let is_content = (header & (1 << 4)) != 0; - println!("RECV total_len={} header={} is_content={}", total_len, header, is_content); + trace!("RECV total_len={} header={} is_content={}", total_len, header, is_content); if header > 0x1F { bail!("Invalid header received: {}", header); @@ -241,15 +321,21 @@ impl DatConnection { 9 => DatNetMessage::Data(parse_from_bytes::(&mut buf)?), other => bail!("Unimplemented message type received: {}", other), }; + trace!("\twas: {:?}", &dnm); Ok((is_content, dnm)) } + /// Special unencrypted variant of `send_msg()`, used only during initial connection + /// establishment (eg, to check metadata discovery key and exchange nonces). After the + /// connection is initialized, send Register (aka Feed) messages as normal to add extra feeds. fn send_register(&mut self, reg: &Feed) -> Result<()> { + // TODO: refactor this to take discovery key and nonce directly let header_int: u8 = 0; let total_message_size = (reg.compute_size() as usize) + 1; - println!("SEND total_len={} header={}", total_message_size, header_int); + trace!("SEND total_len={} header={} msg={:?}", + total_message_size, header_int, reg); self.tcp.write_varint(total_message_size as u64)?; self.tcp.write_varint(header_int as u32)?; @@ -257,7 +343,9 @@ impl DatConnection { Ok(()) } + /// Receive complement to `send_register()`. fn recv_register(&mut self) -> Result { + // TODO: refactor this to return discovery key and nonce directly let total_len: u64 = self.tcp.read_varint()?; let header: u8 = self.tcp.read_varint()?; @@ -266,13 +354,14 @@ impl DatConnection { bail!("Invalid register header received"); } - println!("RECV total_len={} header={}", total_len, header); + trace!("RECV total_len={} header={}", total_len, header); let msg_len = (total_len - 1) as usize; let mut buf = vec![0; msg_len]; self.tcp.read_exact(&mut buf[0..msg_len])?; let reg = parse_from_bytes::(&mut buf)?; + trace!("\twas: {:?}", reg); Ok(reg) } @@ -284,6 +373,17 @@ impl DatConnection { sm.set_downloading(true); self.send_msg(&DatNetMessage::Status(sm), is_content)?; + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + self.send_msg(&DatNetMessage::Have(hm), is_content)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?; + // Want: everything let mut wm = Want::new(); wm.set_start(0); @@ -300,7 +400,7 @@ impl DatConnection { length = have.get_length(); break; } else { - info!("Expected Want message, got: {:?}", &msg); + info!("Expected Have message, got: {:?}", &msg); continue; } }; @@ -316,7 +416,10 @@ impl DatConnection { info!("Expected other message channel"); } if let DatNetMessage::Data(dm) = msg { - println!("Got metadata: {}", dm.get_index()); + info!("Got metadata: {}", dm.get_index()); + // TODO: if this is index 0, and we're receive-all, and this is metadata feed, then + // this is the special Index data entry; we should parse, check type, and store as + // register key (?) } else { info!("Expected Data message, got: {:?}", &msg); continue; -- cgit v1.2.3