aboutsummaryrefslogtreecommitdiffstats
path: root/src/peer.rs
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-01-22 01:12:07 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-01-22 01:12:09 -0800
commite547fa64ce684211f8eb76738f133adfad296eaa (patch)
tree93a116b8e91ebb7b9f4555869614c66df2e7b6ff /src/peer.rs
parent375836cf6322bc31828c5e2be47e5f0aa5f99099 (diff)
downloadgeniza-e547fa64ce684211f8eb76738f133adfad296eaa.tar.gz
geniza-e547fa64ce684211f8eb76738f133adfad296eaa.zip
WIP refactoring DatPeer
- use Key in some more spots (instead of [u8]) - threaded Peer implementation, with chan for communication
Diffstat (limited to 'src/peer.rs')
-rw-r--r--src/peer.rs237
1 files changed, 110 insertions, 127 deletions
diff --git a/src/peer.rs b/src/peer.rs
index f88e8fc..3310a13 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -1,32 +1,120 @@
use errors::*;
-use std::convert::From;
+use std::thread;
+use std::net::ToSocketAddrs;
+use std::fmt::Display;
+use std::time::Duration;
use protocol::{DatConnection, DatNetMessage};
use network_msgs::*;
-use metadata_msgs::Index;
-use bitfield::Bitfield;
use sodiumoxide::crypto::stream::Key;
-use protobuf::parse_from_bytes;
use make_discovery_key;
+use chan;
+
+/// Wraps a low-level DatConnection in a thread (or two). Contains very little context about
+/// itself.
+pub struct DatPeerThread {
+ handle: u64,
+ feeds: Vec<(u8, Key)>,
+ outbound_chan: chan::Sender<(DatNetMessage, u8)>,
+}
-/// Wraps a low-level DatConnection with extra state about active feeds, bitfields that the
-/// remote has declared they Have, etc.
-pub struct DatPeer {
- feeds: Vec<Key>,
- conn: DatConnection,
- remote_has: Vec<Vec<Bitfield>>,
+pub struct PeerMsg {
+ peer_handle: u64,
+ feed_index: u8,
+ msg: DatNetMessage,
}
-impl DatPeer {
+/// This is what the "receive" loop does: simply blocking reads on the TCP socket, passing any
+/// received messages into a channel back to the worker thread.
+fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender<Result<(DatNetMessage, u8)>>) {
+ loop {
+ match dc.recv_msg() {
+ Ok((msg, feed_index)) => {
+ peer_rx.send(Ok((msg, feed_index)));
+ },
+ Err(e) => {
+ peer_rx.send(Err(e));
+ return
+ },
+ }
+ }
+}
- /// Has the remote peer indicated they have the given chunk in the given feed?
- pub fn has(self, feed: u64, index: u64) -> Result<bool> {
- for bitfield in self.remote_has[feed as usize].iter() {
- if bitfield.get(index)? {
- return Ok(true)
+/// Worker thread. After initializing the connection, loops endlessly. Looks for outgoing PeerMsg
+/// on the command channel, and sends these directly (blocking). Also looks for raw received
+/// messages (via a spawned receiver thread), and enhances these with extra context then passes
+/// upwards on the unified peer message channel.
+fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiver<(DatNetMessage, u8)>, unified_chan: chan::Sender<Result<PeerMsg>>) {
+
+ dc.tcp.set_write_timeout(Some(Duration::new(2, 0)));
+
+ let rx_dc = dc.clone();
+ let (receiver_chan, raw_peer_rx) = chan::async();
+ thread::spawn(move || {
+ receiver_loop(rx_dc, receiver_chan);
+ });
+
+ loop {
+ chan_select!{
+ outbound_chan.recv() -> val => {
+ if let Some((msg, feed_index)) = val {
+ match dc.send_msg(&msg, feed_index) {
+ Ok(_) => {},
+ Err(e) => {
+ // TODO: error chain!
+ unified_chan.send(Err(e));
+ return
+ }
+ }
+ };
+ },
+ raw_peer_rx.recv() -> val => {
+ if let Some(Ok((msg, feed_index))) = val {
+ // do mapping between feed index and feed pubkey here
+ let pm = PeerMsg {
+ peer_handle: handle,
+ feed_index: feed_index,
+ msg,
+ };
+ unified_chan.send(Ok(pm));
+ };
}
- }
- Ok(false)
+ };
+ }
+}
+
+impl DatPeerThread {
+
+ pub fn connect<A: ToSocketAddrs + Display>(addr: A, feed_key: Key, handle: u64, is_live: bool, unified_chan: chan::Sender<Result<PeerMsg>>) -> Result<DatPeerThread> {
+
+ let addr = addr.to_socket_addrs().unwrap().nth(0).unwrap();
+ let (outbound_chan, tx_chan) = chan::async();
+ let feed_key2 = feed_key.clone();
+ thread::spawn(move || {
+
+ let dc = match DatConnection::connect(addr, &feed_key, is_live) {
+ Ok(c) => c,
+ Err(e) => {
+ // TODO: error chain!
+ unified_chan.send(Err(e));
+ return;
+ },
+ };
+
+ worker_thread(dc, handle, tx_chan, unified_chan);
+ });
+
+ let dp = DatPeerThread {
+ handle,
+ outbound_chan,
+ feeds: vec![(0, feed_key2)],
+ };
+ Ok(dp)
+ }
+
+ pub fn send(&mut self, net_msg: DatNetMessage, feed_index: u8) -> Result<()> {
+ self.outbound_chan.send((net_msg, feed_index));
+ Ok(())
}
pub fn add_feed(&mut self, key: &[u8]) -> Result<()> {
@@ -35,8 +123,8 @@ impl DatPeer {
let key = Key::from_slice(key_bytes).unwrap();
for k in self.feeds.iter() {
- if *k == key {
- warn!("tried to add existing feed/key on a DatPeer connection");
+ if (*k).1 == key {
+ warn!("tried to add existing feed/key on a DatPeerThread connection");
return Ok(())
}
}
@@ -48,115 +136,10 @@ impl DatPeer {
// Send (encrypted) Feed message for data feed
let mut feed_msg = Feed::new();
feed_msg.set_discoveryKey(discovery_key.to_vec());
- self.conn.send_msg(&DatNetMessage::Feed(feed_msg), index as u8)?;
+ self.outbound_chan.send((DatNetMessage::Feed(feed_msg), index as u8));
- self.feeds.push(key.clone());
- self.remote_has.push(vec![]);
+ self.feeds.push((index as u8, key.clone()));
Ok(())
-
- }
-
- /// hyperdrive-specific helper for discovering the public key for the "content" feed
- /// from the "metadata" feed , and sending a Feed message to initialize on this connection.
- pub fn init_data_feed(&mut self) -> Result<()> {
-
- if self.feeds.len() > 1 {
- return Ok(());
- }
-
- let data_key = self.get_drive_data_key()?;
- self.add_feed(&data_key[0..32])
- }
-
-
- /// hyperdrive-specific helper for returning the "data" feed public key (aka, index=1)
- pub fn get_drive_data_key(&mut self) -> Result<Key> {
-
- if self.feeds.len() > 1 {
- // we already have the key
- let key = self.feeds[1].clone();
- return Ok(key);
- }
-
- // Info: downloading, not uploading
- let mut im = Info::new();
- im.set_uploading(false);
- im.set_downloading(true);
- self.conn.send_msg(&DatNetMessage::Info(im), 0)?;
-
- // Have: nothing (so far)
- let mut hm = Have::new();
- hm.set_start(0);
- hm.set_length(0);
- self.conn.send_msg(&DatNetMessage::Have(hm), 0)?;
-
- // UnHave: still nothing
- let mut uhm = Unhave::new();
- uhm.set_start(0);
- self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?;
-
- // Want: just the first element
- let mut wm = Want::new();
- wm.set_start(0);
- wm.set_length(1);
- self.conn.send_msg(&DatNetMessage::Want(wm), 0)?;
-
- // listen for Have
- loop {
- let (msg, feed_index) = self.conn.recv_msg()?;
- if feed_index == 1 {
- continue;
- }
- if let DatNetMessage::Have(_) = msg {
- break;
- } else {
- info!("Expected Have message, got: {:?}", &msg);
- continue;
- }
- }
-
- // Request
- let mut rm = Request::new();
- rm.set_index(0);
- self.conn.send_msg(&DatNetMessage::Request(rm), 0)?;
-
- loop {
- let (msg, feed_index) = self.conn.recv_msg()?;
- if feed_index == 1 {
- info!("Expected other message channel");
- continue;
- }
- if let DatNetMessage::Data(dm) = msg {
- info!("Got metadata: {}", dm.get_index());
- if dm.get_index() == 0 {
- let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?;
- if index_msg.get_field_type() == "hyperdrive" {
- let data_key = index_msg.get_content();
- if data_key.len() != 32 {
- bail!("Received data key had wrong length: {}", data_key.len());
- }
- // TODO: ok_or(), but what?
- return Ok(Key::from_slice(&data_key[0..32]).unwrap());
- } else {
- bail!("non-hyperdrive Index type: {}", index_msg.get_field_type());
- }
- }
- } else {
- info!("Expected Data message, got: {:?}", &msg);
- continue;
- }
- }
}
}
-impl From<DatConnection> for DatPeer {
-
- fn from(dc: DatConnection) -> DatPeer {
- let key = dc.key.clone();
- DatPeer {
- feeds: vec![key],
- conn: dc,
- remote_has: vec![vec![]],
- }
- }
-}