diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-01-22 01:12:07 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-01-22 01:12:09 -0800 |
commit | e547fa64ce684211f8eb76738f133adfad296eaa (patch) | |
tree | 93a116b8e91ebb7b9f4555869614c66df2e7b6ff /src/peer.rs | |
parent | 375836cf6322bc31828c5e2be47e5f0aa5f99099 (diff) | |
download | geniza-e547fa64ce684211f8eb76738f133adfad296eaa.tar.gz geniza-e547fa64ce684211f8eb76738f133adfad296eaa.zip |
WIP refactoring DatPeer
- use Key in some more spots (instead of [u8])
- threaded Peer implementation, with chan for communication
Diffstat (limited to 'src/peer.rs')
-rw-r--r-- | src/peer.rs | 237 |
1 files changed, 110 insertions, 127 deletions
diff --git a/src/peer.rs b/src/peer.rs index f88e8fc..3310a13 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,32 +1,120 @@ use errors::*; -use std::convert::From; +use std::thread; +use std::net::ToSocketAddrs; +use std::fmt::Display; +use std::time::Duration; use protocol::{DatConnection, DatNetMessage}; use network_msgs::*; -use metadata_msgs::Index; -use bitfield::Bitfield; use sodiumoxide::crypto::stream::Key; -use protobuf::parse_from_bytes; use make_discovery_key; +use chan; + +/// Wraps a low-level DatConnection in a thread (or two). Contains very little context about +/// itself. +pub struct DatPeerThread { + handle: u64, + feeds: Vec<(u8, Key)>, + outbound_chan: chan::Sender<(DatNetMessage, u8)>, +} -/// Wraps a low-level DatConnection with extra state about active feeds, bitfields that the -/// remote has declared they Have, etc. -pub struct DatPeer { - feeds: Vec<Key>, - conn: DatConnection, - remote_has: Vec<Vec<Bitfield>>, +pub struct PeerMsg { + peer_handle: u64, + feed_index: u8, + msg: DatNetMessage, } -impl DatPeer { +/// This is what the "receive" loop does: simply blocking reads on the TCP socket, passing any +/// received messages into a channel back to the worker thread. +fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender<Result<(DatNetMessage, u8)>>) { + loop { + match dc.recv_msg() { + Ok((msg, feed_index)) => { + peer_rx.send(Ok((msg, feed_index))); + }, + Err(e) => { + peer_rx.send(Err(e)); + return + }, + } + } +} - /// Has the remote peer indicated they have the given chunk in the given feed? - pub fn has(self, feed: u64, index: u64) -> Result<bool> { - for bitfield in self.remote_has[feed as usize].iter() { - if bitfield.get(index)? { - return Ok(true) +/// Worker thread. After initializing the connection, loops endlessly. Looks for outgoing PeerMsg +/// on the command channel, and sends these directly (blocking). Also looks for raw received +/// messages (via a spawned receiver thread), and enhances these with extra context then passes +/// upwards on the unified peer message channel. +fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiver<(DatNetMessage, u8)>, unified_chan: chan::Sender<Result<PeerMsg>>) { + + dc.tcp.set_write_timeout(Some(Duration::new(2, 0))); + + let rx_dc = dc.clone(); + let (receiver_chan, raw_peer_rx) = chan::async(); + thread::spawn(move || { + receiver_loop(rx_dc, receiver_chan); + }); + + loop { + chan_select!{ + outbound_chan.recv() -> val => { + if let Some((msg, feed_index)) = val { + match dc.send_msg(&msg, feed_index) { + Ok(_) => {}, + Err(e) => { + // TODO: error chain! + unified_chan.send(Err(e)); + return + } + } + }; + }, + raw_peer_rx.recv() -> val => { + if let Some(Ok((msg, feed_index))) = val { + // do mapping between feed index and feed pubkey here + let pm = PeerMsg { + peer_handle: handle, + feed_index: feed_index, + msg, + }; + unified_chan.send(Ok(pm)); + }; } - } - Ok(false) + }; + } +} + +impl DatPeerThread { + + pub fn connect<A: ToSocketAddrs + Display>(addr: A, feed_key: Key, handle: u64, is_live: bool, unified_chan: chan::Sender<Result<PeerMsg>>) -> Result<DatPeerThread> { + + let addr = addr.to_socket_addrs().unwrap().nth(0).unwrap(); + let (outbound_chan, tx_chan) = chan::async(); + let feed_key2 = feed_key.clone(); + thread::spawn(move || { + + let dc = match DatConnection::connect(addr, &feed_key, is_live) { + Ok(c) => c, + Err(e) => { + // TODO: error chain! + unified_chan.send(Err(e)); + return; + }, + }; + + worker_thread(dc, handle, tx_chan, unified_chan); + }); + + let dp = DatPeerThread { + handle, + outbound_chan, + feeds: vec![(0, feed_key2)], + }; + Ok(dp) + } + + pub fn send(&mut self, net_msg: DatNetMessage, feed_index: u8) -> Result<()> { + self.outbound_chan.send((net_msg, feed_index)); + Ok(()) } pub fn add_feed(&mut self, key: &[u8]) -> Result<()> { @@ -35,8 +123,8 @@ impl DatPeer { let key = Key::from_slice(key_bytes).unwrap(); for k in self.feeds.iter() { - if *k == key { - warn!("tried to add existing feed/key on a DatPeer connection"); + if (*k).1 == key { + warn!("tried to add existing feed/key on a DatPeerThread connection"); return Ok(()) } } @@ -48,115 +136,10 @@ impl DatPeer { // Send (encrypted) Feed message for data feed let mut feed_msg = Feed::new(); feed_msg.set_discoveryKey(discovery_key.to_vec()); - self.conn.send_msg(&DatNetMessage::Feed(feed_msg), index as u8)?; + self.outbound_chan.send((DatNetMessage::Feed(feed_msg), index as u8)); - self.feeds.push(key.clone()); - self.remote_has.push(vec![]); + self.feeds.push((index as u8, key.clone())); Ok(()) - - } - - /// hyperdrive-specific helper for discovering the public key for the "content" feed - /// from the "metadata" feed , and sending a Feed message to initialize on this connection. - pub fn init_data_feed(&mut self) -> Result<()> { - - if self.feeds.len() > 1 { - return Ok(()); - } - - let data_key = self.get_drive_data_key()?; - self.add_feed(&data_key[0..32]) - } - - - /// hyperdrive-specific helper for returning the "data" feed public key (aka, index=1) - pub fn get_drive_data_key(&mut self) -> Result<Key> { - - if self.feeds.len() > 1 { - // we already have the key - let key = self.feeds[1].clone(); - return Ok(key); - } - - // Info: downloading, not uploading - let mut im = Info::new(); - im.set_uploading(false); - im.set_downloading(true); - self.conn.send_msg(&DatNetMessage::Info(im), 0)?; - - // Have: nothing (so far) - let mut hm = Have::new(); - hm.set_start(0); - hm.set_length(0); - self.conn.send_msg(&DatNetMessage::Have(hm), 0)?; - - // UnHave: still nothing - let mut uhm = Unhave::new(); - uhm.set_start(0); - self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?; - - // Want: just the first element - let mut wm = Want::new(); - wm.set_start(0); - wm.set_length(1); - self.conn.send_msg(&DatNetMessage::Want(wm), 0)?; - - // listen for Have - loop { - let (msg, feed_index) = self.conn.recv_msg()?; - if feed_index == 1 { - continue; - } - if let DatNetMessage::Have(_) = msg { - break; - } else { - info!("Expected Have message, got: {:?}", &msg); - continue; - } - } - - // Request - let mut rm = Request::new(); - rm.set_index(0); - self.conn.send_msg(&DatNetMessage::Request(rm), 0)?; - - loop { - let (msg, feed_index) = self.conn.recv_msg()?; - if feed_index == 1 { - info!("Expected other message channel"); - continue; - } - if let DatNetMessage::Data(dm) = msg { - info!("Got metadata: {}", dm.get_index()); - if dm.get_index() == 0 { - let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?; - if index_msg.get_field_type() == "hyperdrive" { - let data_key = index_msg.get_content(); - if data_key.len() != 32 { - bail!("Received data key had wrong length: {}", data_key.len()); - } - // TODO: ok_or(), but what? - return Ok(Key::from_slice(&data_key[0..32]).unwrap()); - } else { - bail!("non-hyperdrive Index type: {}", index_msg.get_field_type()); - } - } - } else { - info!("Expected Data message, got: {:?}", &msg); - continue; - } - } } } -impl From<DatConnection> for DatPeer { - - fn from(dc: DatConnection) -> DatPeer { - let key = dc.key.clone(); - DatPeer { - feeds: vec![key], - conn: dc, - remote_has: vec![vec![]], - } - } -} |