aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2017-10-26 20:09:02 -0700
committerBryan Newbold <bnewbold@robocracy.org>2017-10-26 20:09:02 -0700
commit69e3a609a1b3accd1b030cfbafed8e14cfdcda57 (patch)
treebe5944c0631ea6ef3855bfa80370ba69111489ab
parentffd90794391678a25d794ed10c50c7e376cadb04 (diff)
downloadgeniza-69e3a609a1b3accd1b030cfbafed8e14cfdcda57.tar.gz
geniza-69e3a609a1b3accd1b030cfbafed8e14cfdcda57.zip
network progress
-rw-r--r--src/bin/geniza-net.rs5
-rw-r--r--src/sync.rs132
2 files changed, 105 insertions, 32 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs
index 1f369ab..b9a4872 100644
--- a/src/bin/geniza-net.rs
+++ b/src/bin/geniza-net.rs
@@ -65,8 +65,9 @@ fn run() -> Result<()> {
host_port,
&key_bytes,
false)?;
- dc.receive_all(false)?;
- dc.receive_all(true)?;
+ // XXX: number here totally arbitrary
+ dc.receive_all(false, 10)?;
+ dc.receive_all(true, 10)?;
println!("Done!");
},
_ => {
diff --git a/src/sync.rs b/src/sync.rs
index 3a7c6f7..d092422 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -13,6 +13,7 @@ use integer_encoding::{VarIntReader, VarIntWriter};
use errors::*;
use network_proto::*;
+use drive_proto::Index;
#[derive(Debug)]
pub enum DatNetMessage {
@@ -132,6 +133,15 @@ fn test_bsxii_continued() {
assert_eq!(a, c);
}
+fn make_discovery_key(key: &[u8]) -> Vec<u8> {
+ // calculate discovery key
+ let mut discovery_key = [0; 32];
+ let mut hash = Blake2b::new_keyed(32, key);
+ hash.input(&"hypercore".as_bytes());
+ hash.result(&mut discovery_key);
+ discovery_key.to_vec()
+}
+
/// Represents a bi-directional connection to a network peer
///
/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes.
@@ -183,15 +193,6 @@ impl Write for DatConnection {
}
}
-fn discovery_key(key: &Key) => Vec<u8> {
- // calculate discovery key
- let mut discovery_key = [0; 32];
- let mut hash = Blake2b::new_keyed(32, key);
- hash.input(&"hypercore".as_bytes());
- hash.result(&mut discovery_key);
- discovery_key.to_vec()
-}
-
impl DatConnection {
pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> {
@@ -202,8 +203,8 @@ impl DatConnection {
let mut rng = OsRng::new()?;
rng.fill_bytes(&mut local_id);
- let dk = [0; 32];
- dk[0..32] = discovery_key(key)[0..32];
+ let mut dk = [0; 32];
+ dk.copy_from_slice(&make_discovery_key(key)[0..32]);
// Connect to server
info!("Connecting to {}", host_port);
@@ -218,7 +219,7 @@ impl DatConnection {
live,
remote_id: [0; 32],
key: Key::from_slice(key).unwrap(), // TODO:
- discovery_key,
+ discovery_key: dk,
data_key: [0; 32],
data_discovery_key: [0; 32],
tx_nonce: tx_nonce,
@@ -231,13 +232,13 @@ impl DatConnection {
dc.tcp.set_nodelay(true)?; // Faster handshake
// send register
let mut register_msg = Feed::new();
- register_msg.set_discoveryKey(discovery_key.to_vec());
+ register_msg.set_discoveryKey(dk.to_vec());
register_msg.set_nonce((tx_nonce[0..24]).to_vec());
dc.send_register(&register_msg)?;
// read register
let registration = dc.recv_register()?;
- if registration.get_discoveryKey()[0..32] != discovery_key[..] {
+ if registration.get_discoveryKey()[0..32] != dk[..] {
bail!("Remote peer not sharing same discovery key");
}
let rn = registration.get_nonce();
@@ -264,6 +265,16 @@ impl DatConnection {
bail!("Expected Handshake message, got something else");
}
+ // TODO: read data feed register here?
+
+ // Fetch and configure key for data feed
+ dc.get_data_key()?;
+
+ // Send (encrypted) Register/Feed message for data feed
+ let mut register_msg = Feed::new();
+ register_msg.set_discoveryKey(dc.data_discovery_key.to_vec());
+ dc.send_msg(&DatNetMessage::Register(register_msg), true)?;
+
dc.tcp.set_nodelay(false)?; // Back to normal
Ok(dc)
@@ -371,35 +382,36 @@ impl DatConnection {
Ok(reg)
}
- pub fn receive_all(&mut self, is_content: bool) -> Result<()> {
+ pub fn get_data_key(&mut self) -> Result<()> {
// Status: downloading, not uploading
let mut sm = Info::new();
sm.set_uploading(false);
sm.set_downloading(true);
- self.send_msg(&DatNetMessage::Status(sm), is_content)?;
+ self.send_msg(&DatNetMessage::Status(sm), false)?;
// Have: nothing (so far)
let mut hm = Have::new();
hm.set_start(0);
hm.set_length(0);
- self.send_msg(&DatNetMessage::Have(hm), is_content)?;
+ self.send_msg(&DatNetMessage::Have(hm), false)?;
// UnHave: still nothing
let mut uhm = Unhave::new();
uhm.set_start(0);
- self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?;
+ self.send_msg(&DatNetMessage::Unhave(uhm), false)?;
- // Want: everything
+ // Want: just the first element
let mut wm = Want::new();
wm.set_start(0);
- self.send_msg(&DatNetMessage::Want(wm), is_content)?;
+ wm.set_length(1);
+ self.send_msg(&DatNetMessage::Want(wm), false)?;
// listen for Have
let length;
loop {
let (was_content, msg) = self.recv_msg()?;
- if was_content != is_content{
+ if was_content {
continue;
}
if let DatNetMessage::Have(have) = msg {
@@ -411,26 +423,86 @@ impl DatConnection {
}
};
- // Request / Data loop
- for i in 0..length {
- let mut rm = Request::new();
- rm.set_index(i);
- self.send_msg(&DatNetMessage::Request(rm), is_content)?;
+ // Request
+ let mut rm = Request::new();
+ rm.set_index(0);
+ self.send_msg(&DatNetMessage::Request(rm), false)?;
+ loop {
let (was_content, msg) = self.recv_msg()?;
- if was_content != is_content{
+ if was_content {
info!("Expected other message channel");
+ continue;
}
if let DatNetMessage::Data(dm) = msg {
info!("Got metadata: {}", dm.get_index());
- // TODO: if this is index 0, and we're receive-all, and this is metadata feed, then
- // this is the special Index data entry; we should parse, check type, and store as
- // register key (?)
+ if dm.get_index() == 0 {
+ let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?;
+ if index_msg.get_field_type() == "hyperdrive" {
+ let data_key = index_msg.get_content();
+ if data_key.len() != 32 {
+ bail!("Received data key had wrong length: {}", data_key.len());
+ }
+ info!("Got data discovery key");
+ self.data_key.copy_from_slice(&data_key[0..32]);
+ self.data_discovery_key.copy_from_slice(&make_discovery_key(data_key)[0..32]);
+ return Ok(());
+ } else {
+ unimplemented!("non-hyperdrive Index type: {}", index_msg.get_field_type());
+ }
+ }
} else {
info!("Expected Data message, got: {:?}", &msg);
continue;
}
}
+ }
+
+ pub fn receive_all(&mut self, is_content: bool, length: u64) -> Result<()> {
+
+ // Status: downloading, not uploading
+ let mut sm = Info::new();
+ sm.set_uploading(false);
+ sm.set_downloading(true);
+ self.send_msg(&DatNetMessage::Status(sm), is_content)?;
+
+ // Have: nothing (so far)
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_length(0);
+ self.send_msg(&DatNetMessage::Have(hm), is_content)?;
+
+ // UnHave: still nothing
+ let mut uhm = Unhave::new();
+ uhm.set_start(0);
+ self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?;
+
+ // Want: everything
+ let mut wm = Want::new();
+ wm.set_start(0);
+ self.send_msg(&DatNetMessage::Want(wm), is_content)?;
+
+ // Request / Data loop
+ for i in 0..length {
+ let mut rm = Request::new();
+ rm.set_index(i);
+ self.send_msg(&DatNetMessage::Request(rm), is_content)?;
+
+ loop {
+ let (was_content, msg) = self.recv_msg()?;
+ if was_content != is_content{
+ info!("Expected other message channel");
+ continue;
+ }
+ if let DatNetMessage::Data(dm) = msg {
+ info!("Got content: {}", dm.get_index());
+ break;
+ } else {
+ info!("Expected Data message, got: {:?}", &msg);
+ continue;
+ }
+ }
+ }
Ok(())
}