aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-01-11 00:08:15 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-01-11 00:08:19 -0800
commit88ac722c1d60b01a98ae7e0cb4dc644aebaf4a9d (patch)
tree0fe88e1d0e0e660a127baa89971a872d6d500dd6
parent6ffd1666992d54ffaec757cd2e42e1fd6052f349 (diff)
downloadgeniza-88ac722c1d60b01a98ae7e0cb4dc644aebaf4a9d.tar.gz
geniza-88ac722c1d60b01a98ae7e0cb4dc644aebaf4a9d.zip
make DatConnection drive-agnostic
DatConnection should work with an arbitrary number of mux'd registers (hypercore feeds) on a single connection, including just one. This removes the drive-centric metadata/content ("is_content") flag for an index number. Future refactor should turn this from a u8 to a u32 or more.
-rw-r--r--src/bin/geniza-net.rs6
-rw-r--r--src/protocol.rs144
2 files changed, 32 insertions, 118 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs
index 0c82e6d..99ebdb5 100644
--- a/src/bin/geniza-net.rs
+++ b/src/bin/geniza-net.rs
@@ -75,8 +75,8 @@ fn run() -> Result<()> {
let count: u64 = subm.value_of("count").unwrap().parse().unwrap();
let key_bytes = parse_dat_address(&dat_key)?;
let mut dc = DatConnection::connect(host_port, &key_bytes, false)?;
- dc.receive_some(false, count)?;
- dc.receive_some(true, count)?;
+ dc.receive_some(0, count)?;
+ dc.receive_some(1, count)?;
println!("Done!");
}
("discovery-key", Some(subm)) => {
@@ -115,7 +115,7 @@ fn run() -> Result<()> {
let key_bytes = parse_dat_address(&dat_key)?;
let dir = Path::new(subm.value_of("dat-dir").unwrap());
let mut metadata = SleepDirRegister::create(&dir, "metadata")?;
- node_simple_clone(host_port, &key_bytes, &mut metadata, false)?;
+ node_simple_clone(host_port, &key_bytes, &mut metadata, 0)?;
// TODO: read out content key from metadata register
//let content = SleepDirRegister::create(&dir, "content")?;
//node_simple_clone(host_port, &key_bytes, &mut content, true)?;
diff --git a/src/protocol.rs b/src/protocol.rs
index 5940840..27a7648 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -11,7 +11,6 @@ use integer_encoding::{VarIntReader, VarIntWriter};
use errors::*;
use network_msgs::*;
-use metadata_msgs::Index;
use make_discovery_key;
#[derive(Debug)]
@@ -134,14 +133,12 @@ fn test_bsxii_continued() {
///
/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes.
pub struct DatConnection {
- id: [u8; 32],
+ pub id: [u8; 32],
remote_id: [u8; 32],
tcp: TcpStream,
- live: bool,
- key: Key,
- discovery_key: [u8; 32],
- data_key: [u8; 32],
- data_discovery_key: [u8; 32],
+ pub live: bool,
+ pub key: Key,
+ pub discovery_key: [u8; 32],
tx_nonce: Nonce,
tx_offset: u64,
rx_nonce: Nonce,
@@ -203,8 +200,6 @@ impl DatConnection {
remote_id: [0; 32],
key: Key::from_slice(key).unwrap(), // TODO:
discovery_key: dk,
- data_key: [0; 32],
- data_discovery_key: [0; 32],
tx_nonce: tx_nonce,
tx_offset: 0,
rx_nonce: gen_nonce(), // dummy
@@ -230,11 +225,11 @@ impl DatConnection {
let mut handshake_msg = Handshake::new();
handshake_msg.set_live(dc.live);
handshake_msg.set_id(dc.id.to_vec());
- dc.send_msg(&DatNetMessage::Handshake(handshake_msg), false)?;
+ dc.send_msg(&DatNetMessage::Handshake(handshake_msg), 0)?;
// read handshake
- let (was_content, msg) = dc.recv_msg()?;
- if was_content {
+ let (msg, reg_index) = dc.recv_msg()?;
+ if reg_index == 1 {
bail!("Expected metadata msg, not content");
}
if let DatNetMessage::Handshake(handshake) = msg {
@@ -247,31 +242,22 @@ impl DatConnection {
bail!("Expected Handshake message, got something else");
}
- // TODO: read data feed here?
-
- // Fetch and configure key for data feed
- dc.get_data_key()?;
-
- // Send (encrypted) Feed message for data feed
- let mut feed_msg = Feed::new();
- feed_msg.set_discoveryKey(dc.data_discovery_key.to_vec());
- dc.send_msg(&DatNetMessage::Feed(feed_msg), true)?;
-
dc.tcp.set_nodelay(false)?; // Back to normal
Ok(dc)
}
- pub fn send_msg(&mut self, dnm: &DatNetMessage, is_content: bool) -> Result<()> {
- let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F);
+ /// For hyperdrive connections, `reg_index` is equivalent to a `is_content` boolean flag.
+ pub fn send_msg(&mut self, dnm: &DatNetMessage, reg_index: u8) -> Result<()> {
+ let header_int: u8 = (reg_index as u8) << 4 | (msg_code(dnm) & 0x0F);
let msg: &Message = msg_sugar(dnm);
let total_message_size = (msg.compute_size() as usize) + 1;
trace!(
- "SEND total_len={} header={} is_content={} type={:?}",
+ "SEND total_len={} header={} reg_index={} type={:?}",
total_message_size,
header_int,
- is_content,
+ reg_index,
&dnm
);
@@ -294,17 +280,18 @@ impl DatConnection {
Ok(())
}
- pub fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> {
+ /// Returns a tuple of the received message and the register index it corresponds to.
+ pub fn recv_msg(&mut self) -> Result<(DatNetMessage, u8)> {
let total_len: u64 = self.read_varint()?;
let header: u8 = self.read_varint()?;
- let is_content = (header & (1 << 4)) != 0;
+ let reg_index = (header >> 4) & 0xFF;
trace!(
- "RECV total_len={} header={} is_content={}",
+ "RECV total_len={} header={} reg_index={}",
total_len,
header,
- is_content
+ reg_index,
);
if header > 0x1F {
@@ -329,7 +316,7 @@ impl DatConnection {
other => bail!("Unimplemented message type received: {}", other),
};
trace!("\twas: {:?}", &dnm);
- Ok((is_content, dnm))
+ Ok((dnm, reg_index))
}
/// Special unencrypted variant of `send_msg()`, used only during initial connection
@@ -354,9 +341,9 @@ impl DatConnection {
Ok(())
}
- /// Receive complement to `send_feed()`.
+ /// Receive complement to `send_feed()` (un-encrypted, used only during initial connection
+ /// establishment).
fn recv_feed(&mut self) -> Result<Feed> {
- // TODO: refactor this to return discovery key and nonce directly
let total_len: u64 = self.tcp.read_varint()?;
let header: u8 = self.tcp.read_varint()?;
@@ -376,112 +363,39 @@ impl DatConnection {
Ok(reg)
}
- pub fn get_data_key(&mut self) -> Result<()> {
- // Info: downloading, not uploading
- let mut im = Info::new();
- im.set_uploading(false);
- im.set_downloading(true);
- self.send_msg(&DatNetMessage::Info(im), false)?;
-
- // Have: nothing (so far)
- let mut hm = Have::new();
- hm.set_start(0);
- hm.set_length(0);
- self.send_msg(&DatNetMessage::Have(hm), false)?;
-
- // UnHave: still nothing
- let mut uhm = Unhave::new();
- uhm.set_start(0);
- self.send_msg(&DatNetMessage::Unhave(uhm), false)?;
-
- // Want: just the first element
- let mut wm = Want::new();
- wm.set_start(0);
- wm.set_length(1);
- self.send_msg(&DatNetMessage::Want(wm), false)?;
-
- // listen for Have
- loop {
- let (was_content, msg) = self.recv_msg()?;
- if was_content {
- continue;
- }
- if let DatNetMessage::Have(_) = msg {
- break;
- } else {
- info!("Expected Have message, got: {:?}", &msg);
- continue;
- }
- }
-
- // Request
- let mut rm = Request::new();
- rm.set_index(0);
- self.send_msg(&DatNetMessage::Request(rm), false)?;
-
- loop {
- let (was_content, msg) = self.recv_msg()?;
- if was_content {
- info!("Expected other message channel");
- continue;
- }
- if let DatNetMessage::Data(dm) = msg {
- info!("Got metadata: {}", dm.get_index());
- if dm.get_index() == 0 {
- let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?;
- if index_msg.get_field_type() == "hyperdrive" {
- let data_key = index_msg.get_content();
- if data_key.len() != 32 {
- bail!("Received data key had wrong length: {}", data_key.len());
- }
- info!("Got data discovery key");
- self.data_key.copy_from_slice(&data_key[0..32]);
- self.data_discovery_key
- .copy_from_slice(&make_discovery_key(data_key)[0..32]);
- return Ok(());
- } else {
- bail!("non-hyperdrive Index type: {}", index_msg.get_field_type());
- }
- }
- } else {
- info!("Expected Data message, got: {:?}", &msg);
- continue;
- }
- }
- }
-
- pub fn receive_some(&mut self, is_content: bool, length: u64) -> Result<()> {
+ /// This is a debug/dev helper, will be deleted
+ pub fn receive_some(&mut self, reg_index: u8, length: u64) -> Result<()> {
// Info: downloading, not uploading
let mut im = Info::new();
im.set_uploading(false);
im.set_downloading(true);
- self.send_msg(&DatNetMessage::Info(im), is_content)?;
+ self.send_msg(&DatNetMessage::Info(im), reg_index)?;
// Have: nothing (so far)
let mut hm = Have::new();
hm.set_start(0);
hm.set_length(0);
- self.send_msg(&DatNetMessage::Have(hm), is_content)?;
+ self.send_msg(&DatNetMessage::Have(hm), reg_index)?;
// UnHave: still nothing
let mut uhm = Unhave::new();
uhm.set_start(0);
- self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?;
+ self.send_msg(&DatNetMessage::Unhave(uhm), reg_index)?;
// Want: everything
let mut wm = Want::new();
wm.set_start(0);
- self.send_msg(&DatNetMessage::Want(wm), is_content)?;
+ self.send_msg(&DatNetMessage::Want(wm), reg_index)?;
// Request / Data loop
for i in 0..length {
let mut rm = Request::new();
rm.set_index(i);
- self.send_msg(&DatNetMessage::Request(rm), is_content)?;
+ self.send_msg(&DatNetMessage::Request(rm), reg_index)?;
loop {
- let (was_content, msg) = self.recv_msg()?;
- if was_content != is_content {
+ let (msg, rx_index) = self.recv_msg()?;
+ if rx_index != reg_index {
info!("Expected other message channel");
continue;
}