diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/sync.rs | 164 |
1 files changed, 114 insertions, 50 deletions
diff --git a/src/sync.rs b/src/sync.rs index 0e27503..2869075 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,10 +1,11 @@ use std::net::TcpStream; use std::time::Duration; -use std::io::Read; +use std::io::{Read, Write}; use crypto::digest::Digest; use crypto::blake2b::Blake2b; -use rand::{Rng, OsRng}; +use sodiumoxide::crypto::stream::*; +use rand::{OsRng, Rng}; use protobuf::Message; use protobuf::parse_from_bytes; use integer_encoding::{VarIntReader, VarIntWriter}; @@ -56,16 +57,53 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message { } } +pub struct TcpSodiumReader<'a> { + dc: &'a mut DatConnection, +} + +impl<'a> Read for TcpSodiumReader<'a> { + + fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result<usize> { + let len = self.dc.tcp.read(buf)?; + if len > 0 { + stream_xor_inplace(&mut buf[0..len], &self.dc.rx_nonce, &self.dc.key); + } + self.dc.rx_offset += len as u64; + Ok(len) + } +} + +pub struct TcpSodiumWriter<'a> { + dc: &'a mut DatConnection, +} + +impl<'a> Write for TcpSodiumWriter<'a> { + + fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> { + //let enc = stream_xor_ic(buf, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key); + let enc = stream_xor(buf, &self.dc.tx_nonce, &self.dc.key); + self.dc.tx_offset += buf.len() as u64; + self.dc.tcp.write(&enc) + } + + fn flush(&mut self) -> ::std::io::Result<()> { + self.dc.tcp.flush() + } +} + /// Represents a bi-directional connection to a network peer /// /// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes. pub struct DatConnection { - nonce: [u8; 24], - remote_nonce: [u8; 24], id: [u8; 32], remote_id: [u8; 32], tcp: TcpStream, live: bool, + key: Key, + tx_nonce: Nonce, + tx_offset: u64, + rx_nonce: Nonce, + rx_offset: u64, } impl DatConnection { @@ -73,10 +111,9 @@ impl DatConnection { pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> { let timeout = Duration::new(7, 0); - let mut rng = OsRng::new()?; - let mut nonce = [0; 24]; - rng.fill_bytes(&mut nonce); + let tx_nonce = gen_nonce(); let mut local_id = [0; 32]; + let mut rng = OsRng::new()?; rng.fill_bytes(&mut local_id); // Connect to server @@ -87,12 +124,15 @@ impl DatConnection { tcp.set_write_timeout(Some(timeout))?; let mut dc = DatConnection { - nonce: nonce, - remote_nonce: [0; 24], id: local_id, tcp, live, - remote_id: [0; 32] + remote_id: [0; 32], + key: Key::from_slice(key).unwrap(), // TODO: + tx_nonce: tx_nonce, + tx_offset: 0, + rx_nonce: gen_nonce(), // dummy + rx_offset: 0, }; // Exchange register/feed @@ -105,32 +145,22 @@ impl DatConnection { // send register let mut register_msg = Feed::new(); register_msg.set_discoveryKey(discovery_key.to_vec()); - register_msg.set_nonce(nonce.to_vec()); - dc.send_msg(false, &DatNetMessage::Register(register_msg))?; + register_msg.set_nonce((tx_nonce[0..24]).to_vec()); + dc.send_register(®ister_msg)?; // read register - let (was_content, msg) = dc.recv_msg()?; - if was_content { - bail!("Expected metadata msg, not content"); - } - if let DatNetMessage::Register(registration) = msg { - if registration.get_discoveryKey()[0..32] != discovery_key[..] { - bail!("Remote peer not sharing same discovery key"); - } - // TODO: more satisfying way to do this copy - let rn = registration.get_nonce(); - for i in 0..24 { - dc.remote_nonce[i] = rn[i]; - } - } else { - bail!("Expected Registration message, got something else"); + let registration = dc.recv_register()?; + if registration.get_discoveryKey()[0..32] != discovery_key[..] { + bail!("Remote peer not sharing same discovery key"); } + let rn = registration.get_nonce(); + dc.rx_nonce = Nonce::from_slice(&rn).unwrap(); // send handshake let mut handshake_msg = Handshake::new(); handshake_msg.set_live(live); handshake_msg.set_id(local_id.to_vec()); - dc.send_msg(false, &DatNetMessage::Handshake(handshake_msg))?; + dc.send_msg(&DatNetMessage::Handshake(handshake_msg), false)?; // read handshake let (was_content, msg) = dc.recv_msg()?; @@ -152,8 +182,9 @@ impl DatConnection { Ok(dc) } - fn send_msg(&mut self, is_content: bool, dnm: &DatNetMessage) -> Result<()> { + fn send_msg(&mut self, dnm: &DatNetMessage, is_content: bool) -> Result<()> { + let mut tx_stream = TcpSodiumWriter { dc: self }; let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F); let msg: &Message = msg_sugar(dnm); let total_message_size = (msg.compute_size() as usize) + 1; @@ -161,40 +192,41 @@ impl DatConnection { println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content); // send both header varints, and data - self.tcp.write_varint(total_message_size as u64)?; - self.tcp.write_varint(header_int as u32)?; + tx_stream.write_varint(total_message_size as u64)?; + tx_stream.write_varint(header_int as u32)?; match dnm { - &DatNetMessage::Register(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Handshake(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Status(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Have(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Unhave(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Want(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Unwant(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Request(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Cancel(ref m) => m.write_to_writer(&mut self.tcp)?, - &DatNetMessage::Data(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Register(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Handshake(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Status(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Have(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Unhave(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Want(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Unwant(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Request(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Cancel(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Data(ref m) => m.write_to_writer(&mut tx_stream)?, } Ok(()) } fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> { - let total_len: u64 = self.tcp.read_varint()?; - let header: u8 = self.tcp.read_varint()?; + let mut rx_stream = TcpSodiumReader { dc: self }; + let total_len: u64 = rx_stream.read_varint()?; + let header: u8 = rx_stream.read_varint()?; let is_content = (header & (1 << 4)) != 0; + println!("RECV total_len={} header={} is_content={}", total_len, header, is_content); + if header > 0x1F { - bail!("Invalid header received"); + bail!("Invalid header received: {}", header); } - println!("RECV total_len={} header={} is_content={}", total_len, header, is_content); - let msg_len = (total_len - 1) as usize; let mut buf = vec![0; msg_len]; - self.tcp.read_exact(&mut buf[0..msg_len])?; + rx_stream.read_exact(&mut buf[0..msg_len])?; let dnm = match header & 0x0F { 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf)?), @@ -212,18 +244,50 @@ impl DatConnection { Ok((is_content, dnm)) } + fn send_register(&mut self, reg: &Feed) -> Result<()> { + + let header_int: u8 = 0; + let total_message_size = (reg.compute_size() as usize) + 1; + + println!("SEND total_len={} header={}", total_message_size, header_int); + + self.tcp.write_varint(total_message_size as u64)?; + self.tcp.write_varint(header_int as u32)?; + reg.write_to_writer(&mut self.tcp)?; + Ok(()) + } + + fn recv_register(&mut self) -> Result<Feed> { + + let total_len: u64 = self.tcp.read_varint()?; + let header: u8 = self.tcp.read_varint()?; + + if header != 0 { + bail!("Invalid register header received"); + } + + println!("RECV total_len={} header={}", total_len, header); + + let msg_len = (total_len - 1) as usize; + let mut buf = vec![0; msg_len]; + self.tcp.read_exact(&mut buf[0..msg_len])?; + + let reg = parse_from_bytes::<Feed>(&mut buf)?; + Ok(reg) + } + pub fn receive_all(&mut self, is_content: bool) -> Result<()> { // Status: downloading, not uploading let mut sm = Info::new(); sm.set_uploading(false); sm.set_downloading(true); - self.send_msg(is_content, &DatNetMessage::Status(sm))?; + self.send_msg(&DatNetMessage::Status(sm), is_content)?; // Want: everything let mut wm = Want::new(); wm.set_start(0); - self.send_msg(is_content, &DatNetMessage::Want(wm))?; + self.send_msg(&DatNetMessage::Want(wm), is_content)?; // listen for Have let length; @@ -245,7 +309,7 @@ impl DatConnection { for i in 0..length { let mut rm = Request::new(); rm.set_index(i); - self.send_msg(is_content, &DatNetMessage::Request(rm))?; + self.send_msg(&DatNetMessage::Request(rm), is_content)?; let (was_content, msg) = self.recv_msg()?; if was_content != is_content{ |