diff options
-rw-r--r-- | src/peer.rs | 159 | ||||
-rw-r--r-- | src/synchronizer.rs | 20 |
2 files changed, 155 insertions, 24 deletions
diff --git a/src/peer.rs b/src/peer.rs index 29b90a2..0b88a65 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,31 +1,162 @@ use errors::*; -use sleep_register::{HyperRegister, SleepDirRegister}; -use protocol::DatConnection; +use std::convert::From; +use protocol::{DatConnection, DatNetMessage}; +use network_msgs::*; +use metadata_msgs::Index; use bitfield::Bitfield; +use sodiumoxide::crypto::stream::Key; +use protobuf::parse_from_bytes; +use make_discovery_key; +/// Wraps a low-level DatConnection with extra state about mutually active registers, bitfields the +/// remote has declare they Have, etc. pub struct DatPeer { - registers: Vec<SleepDirRegister>, - connection: DatConnection, - have_log: Vec<Vec<Bitfield>>, + registers: Vec<Key>, + conn: DatConnection, + remote_has: Vec<Vec<Bitfield>>, } impl DatPeer { - pub fn new(connection: DatConnection, registers: Vec<SleepDirRegister>) -> DatPeer { - DatPeer { - registers, - connection, - have_log: vec![], - } - } - + /// Has the remote peer indicated they have the given chunk in the given register? pub fn has(self, register: u64, index: u64) -> Result<bool> { - for bitfield in self.have_log[register as usize].iter() { + for bitfield in self.remote_has[register as usize].iter() { if bitfield.get(index)? { return Ok(true) } } Ok(false) } + + pub fn add_register(&mut self, key: &[u8]) -> Result<()> { + + let key_bytes = key; + let key = Key::from_slice(key_bytes).unwrap(); + + for k in self.registers.iter() { + if *k == key { + warn!("tried to add register an existing key on a DatPeer connection"); + return Ok(()) + } + } + + let index = self.registers.len(); + assert!(index < 256); + let discovery_key = make_discovery_key(key_bytes);; + + // Send (encrypted) Feed message for data feed + let mut feed_msg = Feed::new(); + feed_msg.set_discoveryKey(discovery_key.to_vec()); + self.conn.send_msg(&DatNetMessage::Feed(feed_msg), index as u8)?; + + self.registers.push(key.clone()); + self.remote_has.push(vec![]); + Ok(()) + + } + + /// hyperdrive-specific helper for discovering the public key for the "content" hyperregister + /// from the "metadata" register, and sending a Feed message to initialize on this connection. + pub fn init_data_feed(&mut self) -> Result<()> { + + if self.registers.len() > 1 { + return Ok(()); + } + + let data_key = self.get_drive_data_key()?; + self.add_register(&data_key[0..32]) + } + + + /// hyperdrive-specific helper for returning the "data" hyperregister public key (aka, index=1) + pub fn get_drive_data_key(&mut self) -> Result<Key> { + + if self.registers.len() > 1 { + // we already have the key + let key = self.registers[1].clone(); + return Ok(key); + } + + // Info: downloading, not uploading + let mut im = Info::new(); + im.set_uploading(false); + im.set_downloading(true); + self.conn.send_msg(&DatNetMessage::Info(im), 0)?; + + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + self.conn.send_msg(&DatNetMessage::Have(hm), 0)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?; + + // Want: just the first element + let mut wm = Want::new(); + wm.set_start(0); + wm.set_length(1); + self.conn.send_msg(&DatNetMessage::Want(wm), 0)?; + + // listen for Have + loop { + let (msg, reg_index) = self.conn.recv_msg()?; + if reg_index == 1 { + continue; + } + if let DatNetMessage::Have(_) = msg { + break; + } else { + info!("Expected Have message, got: {:?}", &msg); + continue; + } + } + + // Request + let mut rm = Request::new(); + rm.set_index(0); + self.conn.send_msg(&DatNetMessage::Request(rm), 0)?; + + loop { + let (msg, reg_index) = self.conn.recv_msg()?; + if reg_index == 1 { + info!("Expected other message channel"); + continue; + } + if let DatNetMessage::Data(dm) = msg { + info!("Got metadata: {}", dm.get_index()); + if dm.get_index() == 0 { + let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?; + if index_msg.get_field_type() == "hyperdrive" { + let data_key = index_msg.get_content(); + if data_key.len() != 32 { + bail!("Received data key had wrong length: {}", data_key.len()); + } + // TODO: ok_or(), but what? + return Ok(Key::from_slice(&data_key[0..32]).unwrap()); + } else { + bail!("non-hyperdrive Index type: {}", index_msg.get_field_type()); + } + } + } else { + info!("Expected Data message, got: {:?}", &msg); + continue; + } + } + } +} + +impl From<DatConnection> for DatPeer { + + fn from(dc: DatConnection) -> DatPeer { + let key = dc.key.clone(); + DatPeer { + registers: vec![key], + conn: dc, + remote_has: vec![vec![]], + } + } } diff --git a/src/synchronizer.rs b/src/synchronizer.rs index 4e803fb..4e664f5 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -66,7 +66,7 @@ fn test_max_index() { } /// Tries to connect to a single peer, pull register, and close. -pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, is_content: bool) -> Result<()> { +pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, reg_index: u8) -> Result<()> { if register.len()? > 0 { bail!("Register already had data in it (expected empty for naive clone)"); @@ -79,35 +79,35 @@ pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegist im.set_uploading(false); im.set_downloading(true); let im = DatNetMessage::Info(im); - dc.send_msg(&im, is_content)?; + dc.send_msg(&im, reg_index)?; // Have: nothing (so far) let mut hm = Have::new(); hm.set_start(0); hm.set_length(0); let hm = DatNetMessage::Have(hm); - dc.send_msg(&hm, is_content)?; + dc.send_msg(&hm, reg_index)?; // UnHave: still nothing let mut uhm = Unhave::new(); uhm.set_start(0); let uhm = DatNetMessage::Unhave(uhm); - dc.send_msg(&uhm, is_content)?; + dc.send_msg(&uhm, reg_index)?; // Want: everything let mut wm = Want::new(); wm.set_start(0); let wm = DatNetMessage::Want(wm); - dc.send_msg(&wm, is_content)?; + dc.send_msg(&wm, reg_index)?; let last_entry: u64; // Receive Have messages to determine lengths loop { - let (was_content, msg) = dc.recv_msg()?; + let (msg, got_reg_index) = dc.recv_msg()?; match msg { DatNetMessage::Have(dh) => { - info!("is_content={}; {:?}; bitfield={:?}", was_content, dh, dh.get_bitfield()); + info!("reg_index={}; {:?}; bitfield={:?}", got_reg_index, dh, dh.get_bitfield()); last_entry = max_index(&dh)?; break; }, @@ -124,9 +124,9 @@ pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegist let mut rm = Request::new(); rm.set_index(i); info!("Sending request: {:?}", rm); - dc.send_msg(&DatNetMessage::Request(rm), false)?; - let (was_content, msg) = dc.recv_msg()?; - assert!(!was_content); + dc.send_msg(&DatNetMessage::Request(rm), 0)?; + let (msg, got_reg_index) = dc.recv_msg()?; + assert!(got_reg_index == 0); match msg { DatNetMessage::Data(dm) => { info!("Got data: index={}", dm.get_index()); |