diff options
authorBryan Newbold <bnewbold@robocracy.org>2018-01-22 01:12:07 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-01-22 01:12:09 -0800
commite547fa64ce684211f8eb76738f133adfad296eaa (patch)
parent375836cf6322bc31828c5e2be47e5f0aa5f99099 (diff)
WIP refactoring DatPeer
- use Key in some more spots (instead of [u8]) - threaded Peer implementation, with chan for communication
4 files changed, 138 insertions, 132 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs
index 4b3fa30..b3a55ba 100644
--- a/src/bin/geniza-net.rs
+++ b/src/bin/geniza-net.rs
@@ -6,11 +6,13 @@ extern crate env_logger;
extern crate error_chain;
extern crate geniza;
+extern crate sodiumoxide;
// TODO: more careful import
use geniza::*;
use std::path::Path;
use clap::{App, SubCommand, Arg};
+use sodiumoxide::crypto::stream::Key;
fn run() -> Result<()> {
@@ -59,7 +61,8 @@ fn run() -> Result<()> {
let host_port = subm.value_of("host_port").unwrap();
let dat_key = subm.value_of("dat_key").unwrap();
let key_bytes = parse_dat_address(&dat_key)?;
- DatConnection::connect(host_port, &key_bytes, false)?;
+ let key = Key::from_slice(&key_bytes).unwrap();
+ DatConnection::connect(host_port, &key, false)?;
("discovery-key", Some(subm)) => {
diff --git a/src/lib.rs b/src/lib.rs
index e3ad197..8a4825b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -26,6 +26,8 @@ extern crate sodiumoxide;
extern crate bit_field;
extern crate resolve;
extern crate data_encoding;
+extern crate chan;
extern crate tempdir;
diff --git a/src/peer.rs b/src/peer.rs
index f88e8fc..3310a13 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -1,32 +1,120 @@
use errors::*;
-use std::convert::From;
+use std::thread;
+use std::net::ToSocketAddrs;
+use std::fmt::Display;
+use std::time::Duration;
use protocol::{DatConnection, DatNetMessage};
use network_msgs::*;
-use metadata_msgs::Index;
-use bitfield::Bitfield;
use sodiumoxide::crypto::stream::Key;
-use protobuf::parse_from_bytes;
use make_discovery_key;
+use chan;
+/// Wraps a low-level DatConnection in a thread (or two). Contains very little context about
+/// itself.
+pub struct DatPeerThread {
+ handle: u64,
+ feeds: Vec<(u8, Key)>,
+ outbound_chan: chan::Sender<(DatNetMessage, u8)>,
-/// Wraps a low-level DatConnection with extra state about active feeds, bitfields that the
-/// remote has declared they Have, etc.
-pub struct DatPeer {
- feeds: Vec<Key>,
- conn: DatConnection,
- remote_has: Vec<Vec<Bitfield>>,
+pub struct PeerMsg {
+ peer_handle: u64,
+ feed_index: u8,
+ msg: DatNetMessage,
-impl DatPeer {
+/// This is what the "receive" loop does: simply blocking reads on the TCP socket, passing any
+/// received messages into a channel back to the worker thread.
+fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender<Result<(DatNetMessage, u8)>>) {
+ loop {
+ match dc.recv_msg() {
+ Ok((msg, feed_index)) => {
+ peer_rx.send(Ok((msg, feed_index)));
+ },
+ Err(e) => {
+ peer_rx.send(Err(e));
+ return
+ },
+ }
+ }
- /// Has the remote peer indicated they have the given chunk in the given feed?
- pub fn has(self, feed: u64, index: u64) -> Result<bool> {
- for bitfield in self.remote_has[feed as usize].iter() {
- if bitfield.get(index)? {
- return Ok(true)
+/// Worker thread. After initializing the connection, loops endlessly. Looks for outgoing PeerMsg
+/// on the command channel, and sends these directly (blocking). Also looks for raw received
+/// messages (via a spawned receiver thread), and enhances these with extra context then passes
+/// upwards on the unified peer message channel.
+fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiver<(DatNetMessage, u8)>, unified_chan: chan::Sender<Result<PeerMsg>>) {
+ dc.tcp.set_write_timeout(Some(Duration::new(2, 0)));
+ let rx_dc = dc.clone();
+ let (receiver_chan, raw_peer_rx) = chan::async();
+ thread::spawn(move || {
+ receiver_loop(rx_dc, receiver_chan);
+ });
+ loop {
+ chan_select!{
+ outbound_chan.recv() -> val => {
+ if let Some((msg, feed_index)) = val {
+ match dc.send_msg(&msg, feed_index) {
+ Ok(_) => {},
+ Err(e) => {
+ // TODO: error chain!
+ unified_chan.send(Err(e));
+ return
+ }
+ }
+ };
+ },
+ raw_peer_rx.recv() -> val => {
+ if let Some(Ok((msg, feed_index))) = val {
+ // do mapping between feed index and feed pubkey here
+ let pm = PeerMsg {
+ peer_handle: handle,
+ feed_index: feed_index,
+ msg,
+ };
+ unified_chan.send(Ok(pm));
+ };
- }
- Ok(false)
+ };
+ }
+impl DatPeerThread {
+ pub fn connect<A: ToSocketAddrs + Display>(addr: A, feed_key: Key, handle: u64, is_live: bool, unified_chan: chan::Sender<Result<PeerMsg>>) -> Result<DatPeerThread> {
+ let addr = addr.to_socket_addrs().unwrap().nth(0).unwrap();
+ let (outbound_chan, tx_chan) = chan::async();
+ let feed_key2 = feed_key.clone();
+ thread::spawn(move || {
+ let dc = match DatConnection::connect(addr, &feed_key, is_live) {
+ Ok(c) => c,
+ Err(e) => {
+ // TODO: error chain!
+ unified_chan.send(Err(e));
+ return;
+ },
+ };
+ worker_thread(dc, handle, tx_chan, unified_chan);
+ });
+ let dp = DatPeerThread {
+ handle,
+ outbound_chan,
+ feeds: vec![(0, feed_key2)],
+ };
+ Ok(dp)
+ }
+ pub fn send(&mut self, net_msg: DatNetMessage, feed_index: u8) -> Result<()> {
+ self.outbound_chan.send((net_msg, feed_index));
+ Ok(())
pub fn add_feed(&mut self, key: &[u8]) -> Result<()> {
@@ -35,8 +123,8 @@ impl DatPeer {
let key = Key::from_slice(key_bytes).unwrap();
for k in self.feeds.iter() {
- if *k == key {
- warn!("tried to add existing feed/key on a DatPeer connection");
+ if (*k).1 == key {
+ warn!("tried to add existing feed/key on a DatPeerThread connection");
return Ok(())
@@ -48,115 +136,10 @@ impl DatPeer {
// Send (encrypted) Feed message for data feed
let mut feed_msg = Feed::new();
- self.conn.send_msg(&DatNetMessage::Feed(feed_msg), index as u8)?;
+ self.outbound_chan.send((DatNetMessage::Feed(feed_msg), index as u8));
- self.feeds.push(key.clone());
- self.remote_has.push(vec![]);
+ self.feeds.push((index as u8, key.clone()));
- }
- /// hyperdrive-specific helper for discovering the public key for the "content" feed
- /// from the "metadata" feed , and sending a Feed message to initialize on this connection.
- pub fn init_data_feed(&mut self) -> Result<()> {
- if self.feeds.len() > 1 {
- return Ok(());
- }
- let data_key = self.get_drive_data_key()?;
- self.add_feed(&data_key[0..32])
- }
- /// hyperdrive-specific helper for returning the "data" feed public key (aka, index=1)
- pub fn get_drive_data_key(&mut self) -> Result<Key> {
- if self.feeds.len() > 1 {
- // we already have the key
- let key = self.feeds[1].clone();
- return Ok(key);
- }
- // Info: downloading, not uploading
- let mut im = Info::new();
- im.set_uploading(false);
- im.set_downloading(true);
- self.conn.send_msg(&DatNetMessage::Info(im), 0)?;
- // Have: nothing (so far)
- let mut hm = Have::new();
- hm.set_start(0);
- hm.set_length(0);
- self.conn.send_msg(&DatNetMessage::Have(hm), 0)?;
- // UnHave: still nothing
- let mut uhm = Unhave::new();
- uhm.set_start(0);
- self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?;
- // Want: just the first element
- let mut wm = Want::new();
- wm.set_start(0);
- wm.set_length(1);
- self.conn.send_msg(&DatNetMessage::Want(wm), 0)?;
- // listen for Have
- loop {
- let (msg, feed_index) = self.conn.recv_msg()?;
- if feed_index == 1 {
- continue;
- }
- if let DatNetMessage::Have(_) = msg {
- break;
- } else {
- info!("Expected Have message, got: {:?}", &msg);
- continue;
- }
- }
- // Request
- let mut rm = Request::new();
- rm.set_index(0);
- self.conn.send_msg(&DatNetMessage::Request(rm), 0)?;
- loop {
- let (msg, feed_index) = self.conn.recv_msg()?;
- if feed_index == 1 {
- info!("Expected other message channel");
- continue;
- }
- if let DatNetMessage::Data(dm) = msg {
- info!("Got metadata: {}", dm.get_index());
- if dm.get_index() == 0 {
- let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?;
- if index_msg.get_field_type() == "hyperdrive" {
- let data_key = index_msg.get_content();
- if data_key.len() != 32 {
- bail!("Received data key had wrong length: {}", data_key.len());
- }
- // TODO: ok_or(), but what?
- return Ok(Key::from_slice(&data_key[0..32]).unwrap());
- } else {
- bail!("non-hyperdrive Index type: {}", index_msg.get_field_type());
- }
- }
- } else {
- info!("Expected Data message, got: {:?}", &msg);
- continue;
- }
- }
-impl From<DatConnection> for DatPeer {
- fn from(dc: DatConnection) -> DatPeer {
- let key = dc.key.clone();
- DatPeer {
- feeds: vec![key],
- conn: dc,
- remote_has: vec![vec![]],
- }
- }
diff --git a/src/protocol.rs b/src/protocol.rs
index ee72b4d..84a5793 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -176,8 +176,26 @@ impl Write for DatConnection {
+impl Clone for DatConnection {
+ fn clone(&self) -> DatConnection {
+ DatConnection {
+ id: self.id.clone(),
+ remote_id: self.remote_id.clone(),
+ tcp: self.tcp.try_clone().unwrap(),
+ live: self.live,
+ key: self.key.clone(),
+ discovery_key: self.discovery_key.clone(),
+ tx_nonce: self.tx_nonce.clone(),
+ tx_offset: self.tx_offset,
+ rx_nonce: self.rx_nonce.clone(),
+ rx_offset: self.rx_offset,
+ }
+ }
impl DatConnection {
- pub fn connect<A: ToSocketAddrs + Display>(addr: A, key: &[u8], live: bool) -> Result<DatConnection> {
+ pub fn connect<A: ToSocketAddrs + Display>(addr: A, key: &Key, live: bool) -> Result<DatConnection> {
// Connect to server
info!("Connecting to {}", addr);
@@ -188,7 +206,7 @@ impl DatConnection {
// It's sort of a hack, but this should be usable from an accept() as well as a connect()
- pub fn from_tcp(tcp: TcpStream, key: &[u8], live: bool) -> Result<DatConnection> {
+ pub fn from_tcp(tcp: TcpStream, key: &Key, live: bool) -> Result<DatConnection> {
let tx_nonce = gen_nonce();
let mut local_id = [0; 32];
@@ -196,7 +214,7 @@ impl DatConnection {
rng.fill_bytes(&mut local_id);
let mut dk = [0; 32];
- dk.copy_from_slice(&make_discovery_key(key)[0..32]);
+ dk.copy_from_slice(&make_discovery_key(&key[0..32])[0..32]);
let timeout = Duration::new(7, 0);
@@ -206,7 +224,7 @@ impl DatConnection {
remote_id: [0; 32],
- key: Key::from_slice(key).unwrap(), // TODO:
+ key: key.clone(),
discovery_key: dk,
tx_nonce: tx_nonce,
tx_offset: 0,