From e547fa64ce684211f8eb76738f133adfad296eaa Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 22 Jan 2018 01:12:07 -0800 Subject: WIP refactoring DatPeer - use Key in some more spots (instead of [u8]) - threaded Peer implementation, with chan for communication --- src/bin/geniza-net.rs | 5 +- src/lib.rs | 2 + src/peer.rs | 237 +++++++++++++++++++++++--------------------------- src/protocol.rs | 26 +++++- 4 files changed, 138 insertions(+), 132 deletions(-) diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs index 4b3fa30..b3a55ba 100644 --- a/src/bin/geniza-net.rs +++ b/src/bin/geniza-net.rs @@ -6,11 +6,13 @@ extern crate env_logger; #[macro_use] extern crate error_chain; extern crate geniza; +extern crate sodiumoxide; // TODO: more careful import use geniza::*; use std::path::Path; use clap::{App, SubCommand, Arg}; +use sodiumoxide::crypto::stream::Key; fn run() -> Result<()> { env_logger::init().unwrap(); @@ -59,7 +61,8 @@ fn run() -> Result<()> { let host_port = subm.value_of("host_port").unwrap(); let dat_key = subm.value_of("dat_key").unwrap(); let key_bytes = parse_dat_address(&dat_key)?; - DatConnection::connect(host_port, &key_bytes, false)?; + let key = Key::from_slice(&key_bytes).unwrap(); + DatConnection::connect(host_port, &key, false)?; println!("Done!"); } ("discovery-key", Some(subm)) => { diff --git a/src/lib.rs b/src/lib.rs index e3ad197..8a4825b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,8 @@ extern crate sodiumoxide; extern crate bit_field; extern crate resolve; extern crate data_encoding; +#[macro_use] +extern crate chan; #[cfg(test)] extern crate tempdir; diff --git a/src/peer.rs b/src/peer.rs index f88e8fc..3310a13 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,32 +1,120 @@ use errors::*; -use std::convert::From; +use std::thread; +use std::net::ToSocketAddrs; +use std::fmt::Display; +use std::time::Duration; use protocol::{DatConnection, DatNetMessage}; use network_msgs::*; -use metadata_msgs::Index; -use bitfield::Bitfield; use sodiumoxide::crypto::stream::Key; -use protobuf::parse_from_bytes; use make_discovery_key; +use chan; + +/// Wraps a low-level DatConnection in a thread (or two). Contains very little context about +/// itself. +pub struct DatPeerThread { + handle: u64, + feeds: Vec<(u8, Key)>, + outbound_chan: chan::Sender<(DatNetMessage, u8)>, +} -/// Wraps a low-level DatConnection with extra state about active feeds, bitfields that the -/// remote has declared they Have, etc. -pub struct DatPeer { - feeds: Vec, - conn: DatConnection, - remote_has: Vec>, +pub struct PeerMsg { + peer_handle: u64, + feed_index: u8, + msg: DatNetMessage, } -impl DatPeer { +/// This is what the "receive" loop does: simply blocking reads on the TCP socket, passing any +/// received messages into a channel back to the worker thread. +fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender>) { + loop { + match dc.recv_msg() { + Ok((msg, feed_index)) => { + peer_rx.send(Ok((msg, feed_index))); + }, + Err(e) => { + peer_rx.send(Err(e)); + return + }, + } + } +} - /// Has the remote peer indicated they have the given chunk in the given feed? - pub fn has(self, feed: u64, index: u64) -> Result { - for bitfield in self.remote_has[feed as usize].iter() { - if bitfield.get(index)? { - return Ok(true) +/// Worker thread. After initializing the connection, loops endlessly. Looks for outgoing PeerMsg +/// on the command channel, and sends these directly (blocking). Also looks for raw received +/// messages (via a spawned receiver thread), and enhances these with extra context then passes +/// upwards on the unified peer message channel. +fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiver<(DatNetMessage, u8)>, unified_chan: chan::Sender>) { + + dc.tcp.set_write_timeout(Some(Duration::new(2, 0))); + + let rx_dc = dc.clone(); + let (receiver_chan, raw_peer_rx) = chan::async(); + thread::spawn(move || { + receiver_loop(rx_dc, receiver_chan); + }); + + loop { + chan_select!{ + outbound_chan.recv() -> val => { + if let Some((msg, feed_index)) = val { + match dc.send_msg(&msg, feed_index) { + Ok(_) => {}, + Err(e) => { + // TODO: error chain! + unified_chan.send(Err(e)); + return + } + } + }; + }, + raw_peer_rx.recv() -> val => { + if let Some(Ok((msg, feed_index))) = val { + // do mapping between feed index and feed pubkey here + let pm = PeerMsg { + peer_handle: handle, + feed_index: feed_index, + msg, + }; + unified_chan.send(Ok(pm)); + }; } - } - Ok(false) + }; + } +} + +impl DatPeerThread { + + pub fn connect(addr: A, feed_key: Key, handle: u64, is_live: bool, unified_chan: chan::Sender>) -> Result { + + let addr = addr.to_socket_addrs().unwrap().nth(0).unwrap(); + let (outbound_chan, tx_chan) = chan::async(); + let feed_key2 = feed_key.clone(); + thread::spawn(move || { + + let dc = match DatConnection::connect(addr, &feed_key, is_live) { + Ok(c) => c, + Err(e) => { + // TODO: error chain! + unified_chan.send(Err(e)); + return; + }, + }; + + worker_thread(dc, handle, tx_chan, unified_chan); + }); + + let dp = DatPeerThread { + handle, + outbound_chan, + feeds: vec![(0, feed_key2)], + }; + Ok(dp) + } + + pub fn send(&mut self, net_msg: DatNetMessage, feed_index: u8) -> Result<()> { + self.outbound_chan.send((net_msg, feed_index)); + Ok(()) } pub fn add_feed(&mut self, key: &[u8]) -> Result<()> { @@ -35,8 +123,8 @@ impl DatPeer { let key = Key::from_slice(key_bytes).unwrap(); for k in self.feeds.iter() { - if *k == key { - warn!("tried to add existing feed/key on a DatPeer connection"); + if (*k).1 == key { + warn!("tried to add existing feed/key on a DatPeerThread connection"); return Ok(()) } } @@ -48,115 +136,10 @@ impl DatPeer { // Send (encrypted) Feed message for data feed let mut feed_msg = Feed::new(); feed_msg.set_discoveryKey(discovery_key.to_vec()); - self.conn.send_msg(&DatNetMessage::Feed(feed_msg), index as u8)?; + self.outbound_chan.send((DatNetMessage::Feed(feed_msg), index as u8)); - self.feeds.push(key.clone()); - self.remote_has.push(vec![]); + self.feeds.push((index as u8, key.clone())); Ok(()) - - } - - /// hyperdrive-specific helper for discovering the public key for the "content" feed - /// from the "metadata" feed , and sending a Feed message to initialize on this connection. - pub fn init_data_feed(&mut self) -> Result<()> { - - if self.feeds.len() > 1 { - return Ok(()); - } - - let data_key = self.get_drive_data_key()?; - self.add_feed(&data_key[0..32]) - } - - - /// hyperdrive-specific helper for returning the "data" feed public key (aka, index=1) - pub fn get_drive_data_key(&mut self) -> Result { - - if self.feeds.len() > 1 { - // we already have the key - let key = self.feeds[1].clone(); - return Ok(key); - } - - // Info: downloading, not uploading - let mut im = Info::new(); - im.set_uploading(false); - im.set_downloading(true); - self.conn.send_msg(&DatNetMessage::Info(im), 0)?; - - // Have: nothing (so far) - let mut hm = Have::new(); - hm.set_start(0); - hm.set_length(0); - self.conn.send_msg(&DatNetMessage::Have(hm), 0)?; - - // UnHave: still nothing - let mut uhm = Unhave::new(); - uhm.set_start(0); - self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?; - - // Want: just the first element - let mut wm = Want::new(); - wm.set_start(0); - wm.set_length(1); - self.conn.send_msg(&DatNetMessage::Want(wm), 0)?; - - // listen for Have - loop { - let (msg, feed_index) = self.conn.recv_msg()?; - if feed_index == 1 { - continue; - } - if let DatNetMessage::Have(_) = msg { - break; - } else { - info!("Expected Have message, got: {:?}", &msg); - continue; - } - } - - // Request - let mut rm = Request::new(); - rm.set_index(0); - self.conn.send_msg(&DatNetMessage::Request(rm), 0)?; - - loop { - let (msg, feed_index) = self.conn.recv_msg()?; - if feed_index == 1 { - info!("Expected other message channel"); - continue; - } - if let DatNetMessage::Data(dm) = msg { - info!("Got metadata: {}", dm.get_index()); - if dm.get_index() == 0 { - let index_msg = parse_from_bytes::(&mut dm.get_value())?; - if index_msg.get_field_type() == "hyperdrive" { - let data_key = index_msg.get_content(); - if data_key.len() != 32 { - bail!("Received data key had wrong length: {}", data_key.len()); - } - // TODO: ok_or(), but what? - return Ok(Key::from_slice(&data_key[0..32]).unwrap()); - } else { - bail!("non-hyperdrive Index type: {}", index_msg.get_field_type()); - } - } - } else { - info!("Expected Data message, got: {:?}", &msg); - continue; - } - } } } -impl From for DatPeer { - - fn from(dc: DatConnection) -> DatPeer { - let key = dc.key.clone(); - DatPeer { - feeds: vec![key], - conn: dc, - remote_has: vec![vec![]], - } - } -} diff --git a/src/protocol.rs b/src/protocol.rs index ee72b4d..84a5793 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -176,8 +176,26 @@ impl Write for DatConnection { } } +impl Clone for DatConnection { + fn clone(&self) -> DatConnection { + DatConnection { + id: self.id.clone(), + remote_id: self.remote_id.clone(), + tcp: self.tcp.try_clone().unwrap(), + live: self.live, + key: self.key.clone(), + discovery_key: self.discovery_key.clone(), + tx_nonce: self.tx_nonce.clone(), + tx_offset: self.tx_offset, + rx_nonce: self.rx_nonce.clone(), + rx_offset: self.rx_offset, + + } + } +} + impl DatConnection { - pub fn connect(addr: A, key: &[u8], live: bool) -> Result { + pub fn connect(addr: A, key: &Key, live: bool) -> Result { // Connect to server info!("Connecting to {}", addr); @@ -188,7 +206,7 @@ impl DatConnection { } // It's sort of a hack, but this should be usable from an accept() as well as a connect() - pub fn from_tcp(tcp: TcpStream, key: &[u8], live: bool) -> Result { + pub fn from_tcp(tcp: TcpStream, key: &Key, live: bool) -> Result { let tx_nonce = gen_nonce(); let mut local_id = [0; 32]; @@ -196,7 +214,7 @@ impl DatConnection { rng.fill_bytes(&mut local_id); let mut dk = [0; 32]; - dk.copy_from_slice(&make_discovery_key(key)[0..32]); + dk.copy_from_slice(&make_discovery_key(&key[0..32])[0..32]); let timeout = Duration::new(7, 0); tcp.set_write_timeout(Some(timeout))?; @@ -206,7 +224,7 @@ impl DatConnection { tcp, live, remote_id: [0; 32], - key: Key::from_slice(key).unwrap(), // TODO: + key: key.clone(), discovery_key: dk, tx_nonce: tx_nonce, tx_offset: 0, -- cgit v1.2.3