diff options
-rw-r--r-- | src/peer.rs | 46 | ||||
-rw-r--r-- | src/protocol.rs | 36 |
2 files changed, 41 insertions, 41 deletions
diff --git a/src/peer.rs b/src/peer.rs index 0b88a65..f88e8fc 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -9,19 +9,19 @@ use sodiumoxide::crypto::stream::Key; use protobuf::parse_from_bytes; use make_discovery_key; -/// Wraps a low-level DatConnection with extra state about mutually active registers, bitfields the -/// remote has declare they Have, etc. +/// Wraps a low-level DatConnection with extra state about active feeds, bitfields that the +/// remote has declared they Have, etc. pub struct DatPeer { - registers: Vec<Key>, + feeds: Vec<Key>, conn: DatConnection, remote_has: Vec<Vec<Bitfield>>, } impl DatPeer { - /// Has the remote peer indicated they have the given chunk in the given register? - pub fn has(self, register: u64, index: u64) -> Result<bool> { - for bitfield in self.remote_has[register as usize].iter() { + /// Has the remote peer indicated they have the given chunk in the given feed? + pub fn has(self, feed: u64, index: u64) -> Result<bool> { + for bitfield in self.remote_has[feed as usize].iter() { if bitfield.get(index)? { return Ok(true) } @@ -29,19 +29,19 @@ impl DatPeer { Ok(false) } - pub fn add_register(&mut self, key: &[u8]) -> Result<()> { + pub fn add_feed(&mut self, key: &[u8]) -> Result<()> { let key_bytes = key; let key = Key::from_slice(key_bytes).unwrap(); - for k in self.registers.iter() { + for k in self.feeds.iter() { if *k == key { - warn!("tried to add register an existing key on a DatPeer connection"); + warn!("tried to add existing feed/key on a DatPeer connection"); return Ok(()) } } - let index = self.registers.len(); + let index = self.feeds.len(); assert!(index < 256); let discovery_key = make_discovery_key(key_bytes);; @@ -50,31 +50,31 @@ impl DatPeer { feed_msg.set_discoveryKey(discovery_key.to_vec()); self.conn.send_msg(&DatNetMessage::Feed(feed_msg), index as u8)?; - self.registers.push(key.clone()); + self.feeds.push(key.clone()); self.remote_has.push(vec![]); Ok(()) } - /// hyperdrive-specific helper for discovering the public key for the "content" hyperregister - /// from the "metadata" register, and sending a Feed message to initialize on this connection. + /// hyperdrive-specific helper for discovering the public key for the "content" feed + /// from the "metadata" feed , and sending a Feed message to initialize on this connection. pub fn init_data_feed(&mut self) -> Result<()> { - if self.registers.len() > 1 { + if self.feeds.len() > 1 { return Ok(()); } let data_key = self.get_drive_data_key()?; - self.add_register(&data_key[0..32]) + self.add_feed(&data_key[0..32]) } - /// hyperdrive-specific helper for returning the "data" hyperregister public key (aka, index=1) + /// hyperdrive-specific helper for returning the "data" feed public key (aka, index=1) pub fn get_drive_data_key(&mut self) -> Result<Key> { - if self.registers.len() > 1 { + if self.feeds.len() > 1 { // we already have the key - let key = self.registers[1].clone(); + let key = self.feeds[1].clone(); return Ok(key); } @@ -103,8 +103,8 @@ impl DatPeer { // listen for Have loop { - let (msg, reg_index) = self.conn.recv_msg()?; - if reg_index == 1 { + let (msg, feed_index) = self.conn.recv_msg()?; + if feed_index == 1 { continue; } if let DatNetMessage::Have(_) = msg { @@ -121,8 +121,8 @@ impl DatPeer { self.conn.send_msg(&DatNetMessage::Request(rm), 0)?; loop { - let (msg, reg_index) = self.conn.recv_msg()?; - if reg_index == 1 { + let (msg, feed_index) = self.conn.recv_msg()?; + if feed_index == 1 { info!("Expected other message channel"); continue; } @@ -154,7 +154,7 @@ impl From<DatConnection> for DatPeer { fn from(dc: DatConnection) -> DatPeer { let key = dc.key.clone(); DatPeer { - registers: vec![key], + feeds: vec![key], conn: dc, remote_has: vec![vec![]], } diff --git a/src/protocol.rs b/src/protocol.rs index 27a7648..79e1f1c 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -228,8 +228,8 @@ impl DatConnection { dc.send_msg(&DatNetMessage::Handshake(handshake_msg), 0)?; // read handshake - let (msg, reg_index) = dc.recv_msg()?; - if reg_index == 1 { + let (msg, feed_index) = dc.recv_msg()?; + if feed_index == 1 { bail!("Expected metadata msg, not content"); } if let DatNetMessage::Handshake(handshake) = msg { @@ -247,17 +247,17 @@ impl DatConnection { Ok(dc) } - /// For hyperdrive connections, `reg_index` is equivalent to a `is_content` boolean flag. - pub fn send_msg(&mut self, dnm: &DatNetMessage, reg_index: u8) -> Result<()> { - let header_int: u8 = (reg_index as u8) << 4 | (msg_code(dnm) & 0x0F); + /// For hyperdrive connections, `feed_index` is equivalent to a `is_content` boolean flag. + pub fn send_msg(&mut self, dnm: &DatNetMessage, feed_index: u8) -> Result<()> { + let header_int: u8 = (feed_index as u8) << 4 | (msg_code(dnm) & 0x0F); let msg: &Message = msg_sugar(dnm); let total_message_size = (msg.compute_size() as usize) + 1; trace!( - "SEND total_len={} header={} reg_index={} type={:?}", + "SEND total_len={} header={} feed_index={} type={:?}", total_message_size, header_int, - reg_index, + feed_index, &dnm ); @@ -285,13 +285,13 @@ impl DatConnection { let total_len: u64 = self.read_varint()?; let header: u8 = self.read_varint()?; - let reg_index = (header >> 4) & 0xFF; + let feed_index = (header >> 4) & 0xFF; trace!( - "RECV total_len={} header={} reg_index={}", + "RECV total_len={} header={} feed_index={}", total_len, header, - reg_index, + feed_index, ); if header > 0x1F { @@ -316,7 +316,7 @@ impl DatConnection { other => bail!("Unimplemented message type received: {}", other), }; trace!("\twas: {:?}", &dnm); - Ok((dnm, reg_index)) + Ok((dnm, feed_index)) } /// Special unencrypted variant of `send_msg()`, used only during initial connection @@ -364,38 +364,38 @@ impl DatConnection { } /// This is a debug/dev helper, will be deleted - pub fn receive_some(&mut self, reg_index: u8, length: u64) -> Result<()> { + pub fn receive_some(&mut self, feed_index: u8, length: u64) -> Result<()> { // Info: downloading, not uploading let mut im = Info::new(); im.set_uploading(false); im.set_downloading(true); - self.send_msg(&DatNetMessage::Info(im), reg_index)?; + self.send_msg(&DatNetMessage::Info(im), feed_index)?; // Have: nothing (so far) let mut hm = Have::new(); hm.set_start(0); hm.set_length(0); - self.send_msg(&DatNetMessage::Have(hm), reg_index)?; + self.send_msg(&DatNetMessage::Have(hm), feed_index)?; // UnHave: still nothing let mut uhm = Unhave::new(); uhm.set_start(0); - self.send_msg(&DatNetMessage::Unhave(uhm), reg_index)?; + self.send_msg(&DatNetMessage::Unhave(uhm), feed_index)?; // Want: everything let mut wm = Want::new(); wm.set_start(0); - self.send_msg(&DatNetMessage::Want(wm), reg_index)?; + self.send_msg(&DatNetMessage::Want(wm), feed_index)?; // Request / Data loop for i in 0..length { let mut rm = Request::new(); rm.set_index(i); - self.send_msg(&DatNetMessage::Request(rm), reg_index)?; + self.send_msg(&DatNetMessage::Request(rm), feed_index)?; loop { let (msg, rx_index) = self.recv_msg()?; - if rx_index != reg_index { + if rx_index != feed_index { info!("Expected other message channel"); continue; } |