From 88ac722c1d60b01a98ae7e0cb4dc644aebaf4a9d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 11 Jan 2018 00:08:15 -0800 Subject: make DatConnection drive-agnostic DatConnection should work with an arbitrary number of mux'd registers (hypercore feeds) on a single connection, including just one. This removes the drive-centric metadata/content ("is_content") flag for an index number. Future refactor should turn this from a u8 to a u32 or more. --- src/bin/geniza-net.rs | 6 +-- src/protocol.rs | 144 ++++++++++---------------------------------------- 2 files changed, 32 insertions(+), 118 deletions(-) diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs index 0c82e6d..99ebdb5 100644 --- a/src/bin/geniza-net.rs +++ b/src/bin/geniza-net.rs @@ -75,8 +75,8 @@ fn run() -> Result<()> { let count: u64 = subm.value_of("count").unwrap().parse().unwrap(); let key_bytes = parse_dat_address(&dat_key)?; let mut dc = DatConnection::connect(host_port, &key_bytes, false)?; - dc.receive_some(false, count)?; - dc.receive_some(true, count)?; + dc.receive_some(0, count)?; + dc.receive_some(1, count)?; println!("Done!"); } ("discovery-key", Some(subm)) => { @@ -115,7 +115,7 @@ fn run() -> Result<()> { let key_bytes = parse_dat_address(&dat_key)?; let dir = Path::new(subm.value_of("dat-dir").unwrap()); let mut metadata = SleepDirRegister::create(&dir, "metadata")?; - node_simple_clone(host_port, &key_bytes, &mut metadata, false)?; + node_simple_clone(host_port, &key_bytes, &mut metadata, 0)?; // TODO: read out content key from metadata register //let content = SleepDirRegister::create(&dir, "content")?; //node_simple_clone(host_port, &key_bytes, &mut content, true)?; diff --git a/src/protocol.rs b/src/protocol.rs index 5940840..27a7648 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -11,7 +11,6 @@ use integer_encoding::{VarIntReader, VarIntWriter}; use errors::*; use network_msgs::*; -use metadata_msgs::Index; use make_discovery_key; #[derive(Debug)] @@ -134,14 +133,12 @@ fn test_bsxii_continued() { /// /// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes. pub struct DatConnection { - id: [u8; 32], + pub id: [u8; 32], remote_id: [u8; 32], tcp: TcpStream, - live: bool, - key: Key, - discovery_key: [u8; 32], - data_key: [u8; 32], - data_discovery_key: [u8; 32], + pub live: bool, + pub key: Key, + pub discovery_key: [u8; 32], tx_nonce: Nonce, tx_offset: u64, rx_nonce: Nonce, @@ -203,8 +200,6 @@ impl DatConnection { remote_id: [0; 32], key: Key::from_slice(key).unwrap(), // TODO: discovery_key: dk, - data_key: [0; 32], - data_discovery_key: [0; 32], tx_nonce: tx_nonce, tx_offset: 0, rx_nonce: gen_nonce(), // dummy @@ -230,11 +225,11 @@ impl DatConnection { let mut handshake_msg = Handshake::new(); handshake_msg.set_live(dc.live); handshake_msg.set_id(dc.id.to_vec()); - dc.send_msg(&DatNetMessage::Handshake(handshake_msg), false)?; + dc.send_msg(&DatNetMessage::Handshake(handshake_msg), 0)?; // read handshake - let (was_content, msg) = dc.recv_msg()?; - if was_content { + let (msg, reg_index) = dc.recv_msg()?; + if reg_index == 1 { bail!("Expected metadata msg, not content"); } if let DatNetMessage::Handshake(handshake) = msg { @@ -247,31 +242,22 @@ impl DatConnection { bail!("Expected Handshake message, got something else"); } - // TODO: read data feed here? - - // Fetch and configure key for data feed - dc.get_data_key()?; - - // Send (encrypted) Feed message for data feed - let mut feed_msg = Feed::new(); - feed_msg.set_discoveryKey(dc.data_discovery_key.to_vec()); - dc.send_msg(&DatNetMessage::Feed(feed_msg), true)?; - dc.tcp.set_nodelay(false)?; // Back to normal Ok(dc) } - pub fn send_msg(&mut self, dnm: &DatNetMessage, is_content: bool) -> Result<()> { - let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F); + /// For hyperdrive connections, `reg_index` is equivalent to a `is_content` boolean flag. + pub fn send_msg(&mut self, dnm: &DatNetMessage, reg_index: u8) -> Result<()> { + let header_int: u8 = (reg_index as u8) << 4 | (msg_code(dnm) & 0x0F); let msg: &Message = msg_sugar(dnm); let total_message_size = (msg.compute_size() as usize) + 1; trace!( - "SEND total_len={} header={} is_content={} type={:?}", + "SEND total_len={} header={} reg_index={} type={:?}", total_message_size, header_int, - is_content, + reg_index, &dnm ); @@ -294,17 +280,18 @@ impl DatConnection { Ok(()) } - pub fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> { + /// Returns a tuple of the received message and the register index it corresponds to. + pub fn recv_msg(&mut self) -> Result<(DatNetMessage, u8)> { let total_len: u64 = self.read_varint()?; let header: u8 = self.read_varint()?; - let is_content = (header & (1 << 4)) != 0; + let reg_index = (header >> 4) & 0xFF; trace!( - "RECV total_len={} header={} is_content={}", + "RECV total_len={} header={} reg_index={}", total_len, header, - is_content + reg_index, ); if header > 0x1F { @@ -329,7 +316,7 @@ impl DatConnection { other => bail!("Unimplemented message type received: {}", other), }; trace!("\twas: {:?}", &dnm); - Ok((is_content, dnm)) + Ok((dnm, reg_index)) } /// Special unencrypted variant of `send_msg()`, used only during initial connection @@ -354,9 +341,9 @@ impl DatConnection { Ok(()) } - /// Receive complement to `send_feed()`. + /// Receive complement to `send_feed()` (un-encrypted, used only during initial connection + /// establishment). fn recv_feed(&mut self) -> Result { - // TODO: refactor this to return discovery key and nonce directly let total_len: u64 = self.tcp.read_varint()?; let header: u8 = self.tcp.read_varint()?; @@ -376,112 +363,39 @@ impl DatConnection { Ok(reg) } - pub fn get_data_key(&mut self) -> Result<()> { - // Info: downloading, not uploading - let mut im = Info::new(); - im.set_uploading(false); - im.set_downloading(true); - self.send_msg(&DatNetMessage::Info(im), false)?; - - // Have: nothing (so far) - let mut hm = Have::new(); - hm.set_start(0); - hm.set_length(0); - self.send_msg(&DatNetMessage::Have(hm), false)?; - - // UnHave: still nothing - let mut uhm = Unhave::new(); - uhm.set_start(0); - self.send_msg(&DatNetMessage::Unhave(uhm), false)?; - - // Want: just the first element - let mut wm = Want::new(); - wm.set_start(0); - wm.set_length(1); - self.send_msg(&DatNetMessage::Want(wm), false)?; - - // listen for Have - loop { - let (was_content, msg) = self.recv_msg()?; - if was_content { - continue; - } - if let DatNetMessage::Have(_) = msg { - break; - } else { - info!("Expected Have message, got: {:?}", &msg); - continue; - } - } - - // Request - let mut rm = Request::new(); - rm.set_index(0); - self.send_msg(&DatNetMessage::Request(rm), false)?; - - loop { - let (was_content, msg) = self.recv_msg()?; - if was_content { - info!("Expected other message channel"); - continue; - } - if let DatNetMessage::Data(dm) = msg { - info!("Got metadata: {}", dm.get_index()); - if dm.get_index() == 0 { - let index_msg = parse_from_bytes::(&mut dm.get_value())?; - if index_msg.get_field_type() == "hyperdrive" { - let data_key = index_msg.get_content(); - if data_key.len() != 32 { - bail!("Received data key had wrong length: {}", data_key.len()); - } - info!("Got data discovery key"); - self.data_key.copy_from_slice(&data_key[0..32]); - self.data_discovery_key - .copy_from_slice(&make_discovery_key(data_key)[0..32]); - return Ok(()); - } else { - bail!("non-hyperdrive Index type: {}", index_msg.get_field_type()); - } - } - } else { - info!("Expected Data message, got: {:?}", &msg); - continue; - } - } - } - - pub fn receive_some(&mut self, is_content: bool, length: u64) -> Result<()> { + /// This is a debug/dev helper, will be deleted + pub fn receive_some(&mut self, reg_index: u8, length: u64) -> Result<()> { // Info: downloading, not uploading let mut im = Info::new(); im.set_uploading(false); im.set_downloading(true); - self.send_msg(&DatNetMessage::Info(im), is_content)?; + self.send_msg(&DatNetMessage::Info(im), reg_index)?; // Have: nothing (so far) let mut hm = Have::new(); hm.set_start(0); hm.set_length(0); - self.send_msg(&DatNetMessage::Have(hm), is_content)?; + self.send_msg(&DatNetMessage::Have(hm), reg_index)?; // UnHave: still nothing let mut uhm = Unhave::new(); uhm.set_start(0); - self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?; + self.send_msg(&DatNetMessage::Unhave(uhm), reg_index)?; // Want: everything let mut wm = Want::new(); wm.set_start(0); - self.send_msg(&DatNetMessage::Want(wm), is_content)?; + self.send_msg(&DatNetMessage::Want(wm), reg_index)?; // Request / Data loop for i in 0..length { let mut rm = Request::new(); rm.set_index(i); - self.send_msg(&DatNetMessage::Request(rm), is_content)?; + self.send_msg(&DatNetMessage::Request(rm), reg_index)?; loop { - let (was_content, msg) = self.recv_msg()?; - if was_content != is_content { + let (msg, rx_index) = self.recv_msg()?; + if rx_index != reg_index { info!("Expected other message channel"); continue; } -- cgit v1.2.3