diff options
Diffstat (limited to 'src/sync.rs')
-rw-r--r-- | src/sync.rs | 87 |
1 files changed, 40 insertions, 47 deletions
diff --git a/src/sync.rs b/src/sync.rs index d3f54a5..7dcf8e3 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -132,59 +132,54 @@ fn test_bsxii_continued() { assert_eq!(a, c); } -pub struct TcpSodiumReader<'a> { - dc: &'a mut DatConnection, +/// Represents a bi-directional connection to a network peer +/// +/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes. +pub struct DatConnection { + id: [u8; 32], + remote_id: [u8; 32], + tcp: TcpStream, + live: bool, + key: Key, + tx_nonce: Nonce, + tx_offset: u64, + rx_nonce: Nonce, + rx_offset: u64, } -impl<'a> Read for TcpSodiumReader<'a> { +impl Read for DatConnection { + /// Encrypted TCP read (after connection initialized). Uses XOR of an XSalsa20 stream, using + /// block offsets. fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result<usize> { - let len = self.dc.tcp.read(buf)?; - bytewise_stream_xor_ic_inplace(&mut buf[0..len], self.dc.rx_offset, &self.dc.rx_nonce, &self.dc.key); - self.dc.rx_offset += len as u64; + let len = self.tcp.read(buf)?; + bytewise_stream_xor_ic_inplace(&mut buf[0..len], self.rx_offset, &self.rx_nonce, &self.key); + self.rx_offset += len as u64; Ok(len) } } -pub struct TcpSodiumWriter<'a> { - dc: &'a mut DatConnection, -} - -impl<'a> Write for TcpSodiumWriter<'a> { +impl Write for DatConnection { + /// Encrypted write to complement `read()`. fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> { // Don't mutate what we've been passed let mut enc = vec![0; buf.len()]; enc.copy_from_slice(buf); - bytewise_stream_xor_ic_inplace(&mut enc, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key); - self.dc.tx_offset += enc.len() as u64; + bytewise_stream_xor_ic_inplace(&mut enc, self.tx_offset, &self.tx_nonce, &self.key); + self.tx_offset += enc.len() as u64; - self.dc.tcp.write(&enc) + self.tcp.write(&enc) } fn flush(&mut self) -> ::std::io::Result<()> { - self.dc.tcp.flush() + self.tcp.flush() } } -/// Represents a bi-directional connection to a network peer -/// -/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes. -pub struct DatConnection { - id: [u8; 32], - remote_id: [u8; 32], - tcp: TcpStream, - live: bool, - key: Key, - tx_nonce: Nonce, - tx_offset: u64, - rx_nonce: Nonce, - rx_offset: u64, -} - impl DatConnection { pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> { @@ -263,7 +258,6 @@ impl DatConnection { fn send_msg(&mut self, dnm: &DatNetMessage, is_content: bool) -> Result<()> { - let mut tx_stream = TcpSodiumWriter { dc: self }; let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F); let msg: &Message = msg_sugar(dnm); let total_message_size = (msg.compute_size() as usize) + 1; @@ -272,29 +266,28 @@ impl DatConnection { total_message_size, header_int, is_content, &dnm); // send both header varints, and data - tx_stream.write_varint(total_message_size as u64)?; - tx_stream.write_varint(header_int as u32)?; + self.write_varint(total_message_size as u64)?; + self.write_varint(header_int as u32)?; match dnm { - &DatNetMessage::Register(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Handshake(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Status(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Have(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Unhave(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Want(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Unwant(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Request(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Cancel(ref m) => m.write_to_writer(&mut tx_stream)?, - &DatNetMessage::Data(ref m) => m.write_to_writer(&mut tx_stream)?, + &DatNetMessage::Register(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Handshake(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Status(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Have(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Unhave(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Want(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Unwant(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Request(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Cancel(ref m) => m.write_to_writer(self)?, + &DatNetMessage::Data(ref m) => m.write_to_writer(self)?, } Ok(()) } fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> { - let mut rx_stream = TcpSodiumReader { dc: self }; - let total_len: u64 = rx_stream.read_varint()?; - let header: u8 = rx_stream.read_varint()?; + let total_len: u64 = self.read_varint()?; + let header: u8 = self.read_varint()?; let is_content = (header & (1 << 4)) != 0; @@ -306,7 +299,7 @@ impl DatConnection { let msg_len = (total_len - 1) as usize; let mut buf = vec![0; msg_len]; - rx_stream.read_exact(&mut buf[0..msg_len])?; + self.read_exact(&mut buf[0..msg_len])?; let dnm = match header & 0x0F { 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf)?), |