From 177d639fab67b790f43bc0573d785271d8afb858 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Oct 2017 21:10:19 -0700 Subject: refactor file/module names I had some early confusion around whether SLEEP refered to individual files or the collection of files making a register (it seems to mean the whole register). Will probably need to refactor again. --- src/sync.rs | 507 ------------------------------------------------------------ 1 file changed, 507 deletions(-) delete mode 100644 src/sync.rs (limited to 'src/sync.rs') diff --git a/src/sync.rs b/src/sync.rs deleted file mode 100644 index efa045a..0000000 --- a/src/sync.rs +++ /dev/null @@ -1,507 +0,0 @@ - -use std::net::TcpStream; -use std::time::Duration; -use std::io::{Read, Write}; -use std::cmp; -use crypto::digest::Digest; -use crypto::blake2b::Blake2b; -use sodiumoxide::crypto::stream::*; -use rand::{OsRng, Rng}; -use protobuf::Message; -use protobuf::parse_from_bytes; -use integer_encoding::{VarIntReader, VarIntWriter}; - -use errors::*; -use network_proto::*; -use drive_proto::Index; - -#[derive(Debug)] -pub enum DatNetMessage { - Register(Feed), - Handshake(Handshake), - Status(Info), - Have(Have), - Unhave(Unhave), - Want(Want), - Unwant(Unwant), - Request(Request), - Cancel(Cancel), - Data(Data), -} - -fn msg_code(msg: &DatNetMessage) -> u8 { - match msg { - &DatNetMessage::Register(_) => 0, - &DatNetMessage::Handshake(_) => 1, - &DatNetMessage::Status(_) => 2, - &DatNetMessage::Have(_) => 3, - &DatNetMessage::Unhave(_) => 4, - &DatNetMessage::Want(_) => 5, - &DatNetMessage::Unwant(_) => 6, - &DatNetMessage::Request(_) => 7, - &DatNetMessage::Cancel(_) => 8, - &DatNetMessage::Data(_) => 9, - } -} - -fn msg_sugar(msg: &DatNetMessage) -> &Message { - match msg { - &DatNetMessage::Register(ref m) => m, - &DatNetMessage::Handshake(ref m) => m, - &DatNetMessage::Status(ref m) => m, - &DatNetMessage::Have(ref m) => m, - &DatNetMessage::Unhave(ref m) => m, - &DatNetMessage::Want(ref m) => m, - &DatNetMessage::Unwant(ref m) => m, - &DatNetMessage::Request(ref m) => m, - &DatNetMessage::Cancel(ref m) => m, - &DatNetMessage::Data(ref m) => m, - } -} - -/// This helper is pretty slow/inefficient; lots of copying memory -fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) { - - let mut offset = byte_offset; - - // We may have a partial-64-byte-block to finish encrypting first - let partial_offset: usize = (offset % 64) as usize; - let partial_len: usize = cmp::min(64 - partial_offset, buf.len()); - if partial_len != 0 { - let mut partial = vec![0; 64]; - for i in 0..partial_len { - partial[partial_offset+i] = buf[i]; - } - let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key); - offset += partial_len as u64; - for i in 0..partial_len { - buf[i] = partial_enc[partial_offset+i]; - } - } - if buf.len() > partial_len { - let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key); - //offset += main_enc.len() as u64; - for i in 0..main_enc.len() { - buf[partial_len+i] = main_enc[i]; - } - } -} - -#[test] -fn test_bsxii_short() { - - let nonce = gen_nonce(); - let key = gen_key(); - - for size in [10, 100, 1234].iter() { - let mut a = vec![7; *size]; - let mut b = vec![7; *size]; - let c = vec![7; *size]; - - assert_eq!(a, b); - bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key); - bytewise_stream_xor_ic_inplace(&mut b, 0, &nonce, &key); - assert_eq!(a, b); - assert_ne!(a, c); - bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key); - assert_eq!(a, c); - } -} - -#[test] -fn test_bsxii_continued() { - - let nonce = gen_nonce(); - let key = gen_key(); - - let mut a = vec![7; 1234]; - let mut b = vec![7; 1234]; - let c = vec![7; 1234]; - - assert_eq!(a, b); - bytewise_stream_xor_ic_inplace(&mut a[0..10], 0, &nonce, &key); - bytewise_stream_xor_ic_inplace(&mut a[10..20], 10, &nonce, &key); - bytewise_stream_xor_ic_inplace(&mut b[0..20], 0, &nonce, &key); - assert_eq!(a, b); - bytewise_stream_xor_ic_inplace(&mut a[20..50], 20, &nonce, &key); - bytewise_stream_xor_ic_inplace(&mut a[50..500], 50, &nonce, &key); - bytewise_stream_xor_ic_inplace(&mut b[20..500], 20, &nonce, &key); - assert_ne!(a, c); - assert_eq!(a, b); - bytewise_stream_xor_ic_inplace(&mut a[500..1234], 500, &nonce, &key); - bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key); - assert_eq!(a, c); -} - -fn make_discovery_key(key: &[u8]) -> Vec { - // calculate discovery key - let mut discovery_key = [0; 32]; - let mut hash = Blake2b::new_keyed(32, key); - hash.input(&"hypercore".as_bytes()); - hash.result(&mut discovery_key); - discovery_key.to_vec() -} - -/// Represents a bi-directional connection to a network peer -/// -/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes. -pub struct DatConnection { - id: [u8; 32], - remote_id: [u8; 32], - tcp: TcpStream, - live: bool, - key: Key, - discovery_key: [u8; 32], - data_key: [u8; 32], - data_discovery_key: [u8; 32], - tx_nonce: Nonce, - tx_offset: u64, - rx_nonce: Nonce, - rx_offset: u64, -} - -impl Read for DatConnection { - - /// Encrypted TCP read (after connection initialized). Uses XOR of an XSalsa20 stream, using - /// block offsets. - fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result { - let len = self.tcp.read(buf)?; - bytewise_stream_xor_ic_inplace(&mut buf[0..len], self.rx_offset, &self.rx_nonce, &self.key); - self.rx_offset += len as u64; - - Ok(len) - } -} - -impl Write for DatConnection { - - /// Encrypted write to complement `read()`. - fn write(&mut self, buf: &[u8]) -> ::std::io::Result { - - // Don't mutate what we've been passed - let mut enc = vec![0; buf.len()]; - enc.copy_from_slice(buf); - - bytewise_stream_xor_ic_inplace(&mut enc, self.tx_offset, &self.tx_nonce, &self.key); - self.tx_offset += enc.len() as u64; - - self.tcp.write(&enc) - } - - fn flush(&mut self) -> ::std::io::Result<()> { - self.tcp.flush() - } -} - -impl DatConnection { - - pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result { - - let timeout = Duration::new(7, 0); - let tx_nonce = gen_nonce(); - let mut local_id = [0; 32]; - let mut rng = OsRng::new()?; - rng.fill_bytes(&mut local_id); - - let mut dk = [0; 32]; - dk.copy_from_slice(&make_discovery_key(key)[0..32]); - - // Connect to server - info!("Connecting to {}", host_port); - // TODO: timeout on connect (socketaddr dance) - let tcp = TcpStream::connect(host_port)?; - tcp.set_read_timeout(Some(timeout))?; - tcp.set_write_timeout(Some(timeout))?; - - let mut dc = DatConnection { - id: local_id, - tcp, - live, - remote_id: [0; 32], - key: Key::from_slice(key).unwrap(), // TODO: - discovery_key: dk, - data_key: [0; 32], - data_discovery_key: [0; 32], - tx_nonce: tx_nonce, - tx_offset: 0, - rx_nonce: gen_nonce(), // dummy - rx_offset: 0, - }; - - // Exchange register/feed - dc.tcp.set_nodelay(true)?; // Faster handshake - // send register - let mut register_msg = Feed::new(); - register_msg.set_discoveryKey(dc.discovery_key.to_vec()); - register_msg.set_nonce((tx_nonce[0..24]).to_vec()); - dc.send_register(®ister_msg)?; - - // read register - let registration = dc.recv_register()?; - if registration.get_discoveryKey()[0..32] != dk[..] { - bail!("Remote peer not sharing same discovery key"); - } - let rn = registration.get_nonce(); - dc.rx_nonce = Nonce::from_slice(&rn).unwrap(); - - // send handshake - let mut handshake_msg = Handshake::new(); - handshake_msg.set_live(dc.live); - handshake_msg.set_id(dc.id.to_vec()); - dc.send_msg(&DatNetMessage::Handshake(handshake_msg), false)?; - - // read handshake - let (was_content, msg) = dc.recv_msg()?; - if was_content { - bail!("Expected metadata msg, not content"); - } - if let DatNetMessage::Handshake(handshake) = msg { - // TODO: more satisfying way to do this copy - let hid = handshake.get_id(); - for i in 0..32 { - dc.remote_id[i] = hid[i]; - } - } else { - bail!("Expected Handshake message, got something else"); - } - - // TODO: read data feed register here? - - // Fetch and configure key for data feed - dc.get_data_key()?; - - // Send (encrypted) Register/Feed message for data feed - let mut register_msg = Feed::new(); - register_msg.set_discoveryKey(dc.data_discovery_key.to_vec()); - dc.send_msg(&DatNetMessage::Register(register_msg), true)?; - - dc.tcp.set_nodelay(false)?; // Back to normal - - Ok(dc) - } - - fn send_msg(&mut self, dnm: &DatNetMessage, is_content: bool) -> Result<()> { - - let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F); - let msg: &Message = msg_sugar(dnm); - let total_message_size = (msg.compute_size() as usize) + 1; - - trace!("SEND total_len={} header={} is_content={} type={:?}", - total_message_size, header_int, is_content, &dnm); - - // send both header varints, and data - self.write_varint(total_message_size as u64)?; - self.write_varint(header_int as u32)?; - - match dnm { - &DatNetMessage::Register(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Handshake(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Status(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Have(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Unhave(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Want(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Unwant(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Request(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Cancel(ref m) => m.write_to_writer(self)?, - &DatNetMessage::Data(ref m) => m.write_to_writer(self)?, - } - Ok(()) - } - - fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> { - - let total_len: u64 = self.read_varint()?; - let header: u8 = self.read_varint()?; - - let is_content = (header & (1 << 4)) != 0; - - trace!("RECV total_len={} header={} is_content={}", total_len, header, is_content); - - if header > 0x1F { - bail!("Invalid header received: {}", header); - } - - let msg_len = (total_len - 1) as usize; - let mut buf = vec![0; msg_len]; - self.read_exact(&mut buf[0..msg_len])?; - - let dnm = match header & 0x0F { - 0 => DatNetMessage::Register(parse_from_bytes::(&mut buf)?), - 1 => DatNetMessage::Handshake(parse_from_bytes::(&mut buf)?), - 2 => DatNetMessage::Status(parse_from_bytes::(&mut buf)?), - 3 => DatNetMessage::Have(parse_from_bytes::(&mut buf)?), - 4 => DatNetMessage::Unhave(parse_from_bytes::(&mut buf)?), - 5 => DatNetMessage::Want(parse_from_bytes::(&mut buf)?), - 6 => DatNetMessage::Unwant(parse_from_bytes::(&mut buf)?), - 7 => DatNetMessage::Request(parse_from_bytes::(&mut buf)?), - 8 => DatNetMessage::Cancel(parse_from_bytes::(&mut buf)?), - 9 => DatNetMessage::Data(parse_from_bytes::(&mut buf)?), - other => bail!("Unimplemented message type received: {}", other), - }; - trace!("\twas: {:?}", &dnm); - Ok((is_content, dnm)) - } - - /// Special unencrypted variant of `send_msg()`, used only during initial connection - /// establishment (eg, to check metadata discovery key and exchange nonces). After the - /// connection is initialized, send Register (aka Feed) messages as normal to add extra feeds. - fn send_register(&mut self, reg: &Feed) -> Result<()> { - // TODO: refactor this to take discovery key and nonce directly - - let header_int: u8 = 0; - let total_message_size = (reg.compute_size() as usize) + 1; - - trace!("SEND total_len={} header={} msg={:?}", - total_message_size, header_int, reg); - - self.tcp.write_varint(total_message_size as u64)?; - self.tcp.write_varint(header_int as u32)?; - reg.write_to_writer(&mut self.tcp)?; - Ok(()) - } - - /// Receive complement to `send_register()`. - fn recv_register(&mut self) -> Result { - // TODO: refactor this to return discovery key and nonce directly - - let total_len: u64 = self.tcp.read_varint()?; - let header: u8 = self.tcp.read_varint()?; - - if header != 0 { - bail!("Invalid register header received"); - } - - trace!("RECV total_len={} header={}", total_len, header); - - let msg_len = (total_len - 1) as usize; - let mut buf = vec![0; msg_len]; - self.tcp.read_exact(&mut buf[0..msg_len])?; - - let reg = parse_from_bytes::(&mut buf)?; - trace!("\twas: {:?}", reg); - Ok(reg) - } - - pub fn get_data_key(&mut self) -> Result<()> { - - // Status: downloading, not uploading - let mut sm = Info::new(); - sm.set_uploading(false); - sm.set_downloading(true); - self.send_msg(&DatNetMessage::Status(sm), false)?; - - // Have: nothing (so far) - let mut hm = Have::new(); - hm.set_start(0); - hm.set_length(0); - self.send_msg(&DatNetMessage::Have(hm), false)?; - - // UnHave: still nothing - let mut uhm = Unhave::new(); - uhm.set_start(0); - self.send_msg(&DatNetMessage::Unhave(uhm), false)?; - - // Want: just the first element - let mut wm = Want::new(); - wm.set_start(0); - wm.set_length(1); - self.send_msg(&DatNetMessage::Want(wm), false)?; - - // listen for Have - loop { - let (was_content, msg) = self.recv_msg()?; - if was_content { - continue; - } - if let DatNetMessage::Have(_) = msg { - break; - } else { - info!("Expected Have message, got: {:?}", &msg); - continue; - } - }; - - // Request - let mut rm = Request::new(); - rm.set_index(0); - self.send_msg(&DatNetMessage::Request(rm), false)?; - - loop { - let (was_content, msg) = self.recv_msg()?; - if was_content { - info!("Expected other message channel"); - continue; - } - if let DatNetMessage::Data(dm) = msg { - info!("Got metadata: {}", dm.get_index()); - if dm.get_index() == 0 { - let index_msg = parse_from_bytes::(&mut dm.get_value())?; - if index_msg.get_field_type() == "hyperdrive" { - let data_key = index_msg.get_content(); - if data_key.len() != 32 { - bail!("Received data key had wrong length: {}", data_key.len()); - } - info!("Got data discovery key"); - self.data_key.copy_from_slice(&data_key[0..32]); - self.data_discovery_key.copy_from_slice(&make_discovery_key(data_key)[0..32]); - return Ok(()); - } else { - unimplemented!("non-hyperdrive Index type: {}", index_msg.get_field_type()); - } - } - } else { - info!("Expected Data message, got: {:?}", &msg); - continue; - } - } - } - - pub fn receive_all(&mut self, is_content: bool, length: u64) -> Result<()> { - - // Status: downloading, not uploading - let mut sm = Info::new(); - sm.set_uploading(false); - sm.set_downloading(true); - self.send_msg(&DatNetMessage::Status(sm), is_content)?; - - // Have: nothing (so far) - let mut hm = Have::new(); - hm.set_start(0); - hm.set_length(0); - self.send_msg(&DatNetMessage::Have(hm), is_content)?; - - // UnHave: still nothing - let mut uhm = Unhave::new(); - uhm.set_start(0); - self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?; - - // Want: everything - let mut wm = Want::new(); - wm.set_start(0); - self.send_msg(&DatNetMessage::Want(wm), is_content)?; - - // Request / Data loop - for i in 0..length { - let mut rm = Request::new(); - rm.set_index(i); - self.send_msg(&DatNetMessage::Request(rm), is_content)?; - - loop { - let (was_content, msg) = self.recv_msg()?; - if was_content != is_content{ - info!("Expected other message channel"); - continue; - } - if let DatNetMessage::Data(dm) = msg { - info!("Got content: {}", dm.get_index()); - break; - } else { - info!("Expected Data message, got: {:?}", &msg); - continue; - } - } - } - - Ok(()) - } -} -- cgit v1.2.3