From 69e3a609a1b3accd1b030cfbafed8e14cfdcda57 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Oct 2017 20:09:02 -0700 Subject: network progress --- src/bin/geniza-net.rs | 5 +- src/sync.rs | 132 ++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 105 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs index 1f369ab..b9a4872 100644 --- a/src/bin/geniza-net.rs +++ b/src/bin/geniza-net.rs @@ -65,8 +65,9 @@ fn run() -> Result<()> { host_port, &key_bytes, false)?; - dc.receive_all(false)?; - dc.receive_all(true)?; + // XXX: number here totally arbitrary + dc.receive_all(false, 10)?; + dc.receive_all(true, 10)?; println!("Done!"); }, _ => { diff --git a/src/sync.rs b/src/sync.rs index 3a7c6f7..d092422 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -13,6 +13,7 @@ use integer_encoding::{VarIntReader, VarIntWriter}; use errors::*; use network_proto::*; +use drive_proto::Index; #[derive(Debug)] pub enum DatNetMessage { @@ -132,6 +133,15 @@ fn test_bsxii_continued() { assert_eq!(a, c); } +fn make_discovery_key(key: &[u8]) -> Vec { + // calculate discovery key + let mut discovery_key = [0; 32]; + let mut hash = Blake2b::new_keyed(32, key); + hash.input(&"hypercore".as_bytes()); + hash.result(&mut discovery_key); + discovery_key.to_vec() +} + /// Represents a bi-directional connection to a network peer /// /// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes. @@ -183,15 +193,6 @@ impl Write for DatConnection { } } -fn discovery_key(key: &Key) => Vec { - // calculate discovery key - let mut discovery_key = [0; 32]; - let mut hash = Blake2b::new_keyed(32, key); - hash.input(&"hypercore".as_bytes()); - hash.result(&mut discovery_key); - discovery_key.to_vec() -} - impl DatConnection { pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result { @@ -202,8 +203,8 @@ impl DatConnection { let mut rng = OsRng::new()?; rng.fill_bytes(&mut local_id); - let dk = [0; 32]; - dk[0..32] = discovery_key(key)[0..32]; + let mut dk = [0; 32]; + dk.copy_from_slice(&make_discovery_key(key)[0..32]); // Connect to server info!("Connecting to {}", host_port); @@ -218,7 +219,7 @@ impl DatConnection { live, remote_id: [0; 32], key: Key::from_slice(key).unwrap(), // TODO: - discovery_key, + discovery_key: dk, data_key: [0; 32], data_discovery_key: [0; 32], tx_nonce: tx_nonce, @@ -231,13 +232,13 @@ impl DatConnection { dc.tcp.set_nodelay(true)?; // Faster handshake // send register let mut register_msg = Feed::new(); - register_msg.set_discoveryKey(discovery_key.to_vec()); + register_msg.set_discoveryKey(dk.to_vec()); register_msg.set_nonce((tx_nonce[0..24]).to_vec()); dc.send_register(®ister_msg)?; // read register let registration = dc.recv_register()?; - if registration.get_discoveryKey()[0..32] != discovery_key[..] { + if registration.get_discoveryKey()[0..32] != dk[..] { bail!("Remote peer not sharing same discovery key"); } let rn = registration.get_nonce(); @@ -264,6 +265,16 @@ impl DatConnection { bail!("Expected Handshake message, got something else"); } + // TODO: read data feed register here? + + // Fetch and configure key for data feed + dc.get_data_key()?; + + // Send (encrypted) Register/Feed message for data feed + let mut register_msg = Feed::new(); + register_msg.set_discoveryKey(dc.data_discovery_key.to_vec()); + dc.send_msg(&DatNetMessage::Register(register_msg), true)?; + dc.tcp.set_nodelay(false)?; // Back to normal Ok(dc) @@ -371,35 +382,36 @@ impl DatConnection { Ok(reg) } - pub fn receive_all(&mut self, is_content: bool) -> Result<()> { + pub fn get_data_key(&mut self) -> Result<()> { // Status: downloading, not uploading let mut sm = Info::new(); sm.set_uploading(false); sm.set_downloading(true); - self.send_msg(&DatNetMessage::Status(sm), is_content)?; + self.send_msg(&DatNetMessage::Status(sm), false)?; // Have: nothing (so far) let mut hm = Have::new(); hm.set_start(0); hm.set_length(0); - self.send_msg(&DatNetMessage::Have(hm), is_content)?; + self.send_msg(&DatNetMessage::Have(hm), false)?; // UnHave: still nothing let mut uhm = Unhave::new(); uhm.set_start(0); - self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?; + self.send_msg(&DatNetMessage::Unhave(uhm), false)?; - // Want: everything + // Want: just the first element let mut wm = Want::new(); wm.set_start(0); - self.send_msg(&DatNetMessage::Want(wm), is_content)?; + wm.set_length(1); + self.send_msg(&DatNetMessage::Want(wm), false)?; // listen for Have let length; loop { let (was_content, msg) = self.recv_msg()?; - if was_content != is_content{ + if was_content { continue; } if let DatNetMessage::Have(have) = msg { @@ -411,26 +423,86 @@ impl DatConnection { } }; - // Request / Data loop - for i in 0..length { - let mut rm = Request::new(); - rm.set_index(i); - self.send_msg(&DatNetMessage::Request(rm), is_content)?; + // Request + let mut rm = Request::new(); + rm.set_index(0); + self.send_msg(&DatNetMessage::Request(rm), false)?; + loop { let (was_content, msg) = self.recv_msg()?; - if was_content != is_content{ + if was_content { info!("Expected other message channel"); + continue; } if let DatNetMessage::Data(dm) = msg { info!("Got metadata: {}", dm.get_index()); - // TODO: if this is index 0, and we're receive-all, and this is metadata feed, then - // this is the special Index data entry; we should parse, check type, and store as - // register key (?) + if dm.get_index() == 0 { + let index_msg = parse_from_bytes::(&mut dm.get_value())?; + if index_msg.get_field_type() == "hyperdrive" { + let data_key = index_msg.get_content(); + if data_key.len() != 32 { + bail!("Received data key had wrong length: {}", data_key.len()); + } + info!("Got data discovery key"); + self.data_key.copy_from_slice(&data_key[0..32]); + self.data_discovery_key.copy_from_slice(&make_discovery_key(data_key)[0..32]); + return Ok(()); + } else { + unimplemented!("non-hyperdrive Index type: {}", index_msg.get_field_type()); + } + } } else { info!("Expected Data message, got: {:?}", &msg); continue; } } + } + + pub fn receive_all(&mut self, is_content: bool, length: u64) -> Result<()> { + + // Status: downloading, not uploading + let mut sm = Info::new(); + sm.set_uploading(false); + sm.set_downloading(true); + self.send_msg(&DatNetMessage::Status(sm), is_content)?; + + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + self.send_msg(&DatNetMessage::Have(hm), is_content)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?; + + // Want: everything + let mut wm = Want::new(); + wm.set_start(0); + self.send_msg(&DatNetMessage::Want(wm), is_content)?; + + // Request / Data loop + for i in 0..length { + let mut rm = Request::new(); + rm.set_index(i); + self.send_msg(&DatNetMessage::Request(rm), is_content)?; + + loop { + let (was_content, msg) = self.recv_msg()?; + if was_content != is_content{ + info!("Expected other message channel"); + continue; + } + if let DatNetMessage::Data(dm) = msg { + info!("Got content: {}", dm.get_index()); + break; + } else { + info!("Expected Data message, got: {:?}", &msg); + continue; + } + } + } Ok(()) } -- cgit v1.2.3