aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/peer.rs159
-rw-r--r--src/synchronizer.rs20
2 files changed, 155 insertions, 24 deletions
diff --git a/src/peer.rs b/src/peer.rs
index 29b90a2..0b88a65 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -1,31 +1,162 @@
use errors::*;
-use sleep_register::{HyperRegister, SleepDirRegister};
-use protocol::DatConnection;
+use std::convert::From;
+use protocol::{DatConnection, DatNetMessage};
+use network_msgs::*;
+use metadata_msgs::Index;
use bitfield::Bitfield;
+use sodiumoxide::crypto::stream::Key;
+use protobuf::parse_from_bytes;
+use make_discovery_key;
+/// Wraps a low-level DatConnection with extra state about mutually active registers, bitfields the
+/// remote has declare they Have, etc.
pub struct DatPeer {
- registers: Vec<SleepDirRegister>,
- connection: DatConnection,
- have_log: Vec<Vec<Bitfield>>,
+ registers: Vec<Key>,
+ conn: DatConnection,
+ remote_has: Vec<Vec<Bitfield>>,
}
impl DatPeer {
- pub fn new(connection: DatConnection, registers: Vec<SleepDirRegister>) -> DatPeer {
- DatPeer {
- registers,
- connection,
- have_log: vec![],
- }
- }
-
+ /// Has the remote peer indicated they have the given chunk in the given register?
pub fn has(self, register: u64, index: u64) -> Result<bool> {
- for bitfield in self.have_log[register as usize].iter() {
+ for bitfield in self.remote_has[register as usize].iter() {
if bitfield.get(index)? {
return Ok(true)
}
}
Ok(false)
}
+
+ pub fn add_register(&mut self, key: &[u8]) -> Result<()> {
+
+ let key_bytes = key;
+ let key = Key::from_slice(key_bytes).unwrap();
+
+ for k in self.registers.iter() {
+ if *k == key {
+ warn!("tried to add register an existing key on a DatPeer connection");
+ return Ok(())
+ }
+ }
+
+ let index = self.registers.len();
+ assert!(index < 256);
+ let discovery_key = make_discovery_key(key_bytes);;
+
+ // Send (encrypted) Feed message for data feed
+ let mut feed_msg = Feed::new();
+ feed_msg.set_discoveryKey(discovery_key.to_vec());
+ self.conn.send_msg(&DatNetMessage::Feed(feed_msg), index as u8)?;
+
+ self.registers.push(key.clone());
+ self.remote_has.push(vec![]);
+ Ok(())
+
+ }
+
+ /// hyperdrive-specific helper for discovering the public key for the "content" hyperregister
+ /// from the "metadata" register, and sending a Feed message to initialize on this connection.
+ pub fn init_data_feed(&mut self) -> Result<()> {
+
+ if self.registers.len() > 1 {
+ return Ok(());
+ }
+
+ let data_key = self.get_drive_data_key()?;
+ self.add_register(&data_key[0..32])
+ }
+
+
+ /// hyperdrive-specific helper for returning the "data" hyperregister public key (aka, index=1)
+ pub fn get_drive_data_key(&mut self) -> Result<Key> {
+
+ if self.registers.len() > 1 {
+ // we already have the key
+ let key = self.registers[1].clone();
+ return Ok(key);
+ }
+
+ // Info: downloading, not uploading
+ let mut im = Info::new();
+ im.set_uploading(false);
+ im.set_downloading(true);
+ self.conn.send_msg(&DatNetMessage::Info(im), 0)?;
+
+ // Have: nothing (so far)
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_length(0);
+ self.conn.send_msg(&DatNetMessage::Have(hm), 0)?;
+
+ // UnHave: still nothing
+ let mut uhm = Unhave::new();
+ uhm.set_start(0);
+ self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?;
+
+ // Want: just the first element
+ let mut wm = Want::new();
+ wm.set_start(0);
+ wm.set_length(1);
+ self.conn.send_msg(&DatNetMessage::Want(wm), 0)?;
+
+ // listen for Have
+ loop {
+ let (msg, reg_index) = self.conn.recv_msg()?;
+ if reg_index == 1 {
+ continue;
+ }
+ if let DatNetMessage::Have(_) = msg {
+ break;
+ } else {
+ info!("Expected Have message, got: {:?}", &msg);
+ continue;
+ }
+ }
+
+ // Request
+ let mut rm = Request::new();
+ rm.set_index(0);
+ self.conn.send_msg(&DatNetMessage::Request(rm), 0)?;
+
+ loop {
+ let (msg, reg_index) = self.conn.recv_msg()?;
+ if reg_index == 1 {
+ info!("Expected other message channel");
+ continue;
+ }
+ if let DatNetMessage::Data(dm) = msg {
+ info!("Got metadata: {}", dm.get_index());
+ if dm.get_index() == 0 {
+ let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?;
+ if index_msg.get_field_type() == "hyperdrive" {
+ let data_key = index_msg.get_content();
+ if data_key.len() != 32 {
+ bail!("Received data key had wrong length: {}", data_key.len());
+ }
+ // TODO: ok_or(), but what?
+ return Ok(Key::from_slice(&data_key[0..32]).unwrap());
+ } else {
+ bail!("non-hyperdrive Index type: {}", index_msg.get_field_type());
+ }
+ }
+ } else {
+ info!("Expected Data message, got: {:?}", &msg);
+ continue;
+ }
+ }
+ }
+}
+
+impl From<DatConnection> for DatPeer {
+
+ fn from(dc: DatConnection) -> DatPeer {
+ let key = dc.key.clone();
+ DatPeer {
+ registers: vec![key],
+ conn: dc,
+ remote_has: vec![vec![]],
+ }
+ }
}
diff --git a/src/synchronizer.rs b/src/synchronizer.rs
index 4e803fb..4e664f5 100644
--- a/src/synchronizer.rs
+++ b/src/synchronizer.rs
@@ -66,7 +66,7 @@ fn test_max_index() {
}
/// Tries to connect to a single peer, pull register, and close.
-pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, is_content: bool) -> Result<()> {
+pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, reg_index: u8) -> Result<()> {
if register.len()? > 0 {
bail!("Register already had data in it (expected empty for naive clone)");
@@ -79,35 +79,35 @@ pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegist
im.set_uploading(false);
im.set_downloading(true);
let im = DatNetMessage::Info(im);
- dc.send_msg(&im, is_content)?;
+ dc.send_msg(&im, reg_index)?;
// Have: nothing (so far)
let mut hm = Have::new();
hm.set_start(0);
hm.set_length(0);
let hm = DatNetMessage::Have(hm);
- dc.send_msg(&hm, is_content)?;
+ dc.send_msg(&hm, reg_index)?;
// UnHave: still nothing
let mut uhm = Unhave::new();
uhm.set_start(0);
let uhm = DatNetMessage::Unhave(uhm);
- dc.send_msg(&uhm, is_content)?;
+ dc.send_msg(&uhm, reg_index)?;
// Want: everything
let mut wm = Want::new();
wm.set_start(0);
let wm = DatNetMessage::Want(wm);
- dc.send_msg(&wm, is_content)?;
+ dc.send_msg(&wm, reg_index)?;
let last_entry: u64;
// Receive Have messages to determine lengths
loop {
- let (was_content, msg) = dc.recv_msg()?;
+ let (msg, got_reg_index) = dc.recv_msg()?;
match msg {
DatNetMessage::Have(dh) => {
- info!("is_content={}; {:?}; bitfield={:?}", was_content, dh, dh.get_bitfield());
+ info!("reg_index={}; {:?}; bitfield={:?}", got_reg_index, dh, dh.get_bitfield());
last_entry = max_index(&dh)?;
break;
},
@@ -124,9 +124,9 @@ pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegist
let mut rm = Request::new();
rm.set_index(i);
info!("Sending request: {:?}", rm);
- dc.send_msg(&DatNetMessage::Request(rm), false)?;
- let (was_content, msg) = dc.recv_msg()?;
- assert!(!was_content);
+ dc.send_msg(&DatNetMessage::Request(rm), 0)?;
+ let (msg, got_reg_index) = dc.recv_msg()?;
+ assert!(got_reg_index == 0);
match msg {
DatNetMessage::Data(dm) => {
info!("Got data: index={}", dm.get_index());