From 72f65d94b5eb95ef4fc1632adea1cfe4022281dd Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 22 Jan 2018 02:31:00 -0800 Subject: WIP on synchronizer state machine --- src/lib.rs | 1 + src/peer.rs | 8 ++-- src/synchronizer.rs | 118 +++++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 113 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8a4825b..5370743 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ extern crate resolve; extern crate data_encoding; #[macro_use] extern crate chan; +extern crate bit_vec; #[cfg(test)] extern crate tempdir; diff --git a/src/peer.rs b/src/peer.rs index c6f8c6d..0f04a40 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -13,15 +13,15 @@ use chan; /// Wraps a low-level DatConnection in a thread (or two). Contains very little context about /// itself. pub struct DatPeerThread { - handle: u64, + pub handle: u64, feeds: Vec<(u8, Key)>, outbound_chan: chan::Sender<(DatNetMessage, u8)>, } pub struct PeerMsg { - peer_handle: u64, - feed_index: u8, - msg: DatNetMessage, + pub peer_handle: u64, + pub feed_index: u8, + pub msg: DatNetMessage, } /// This is what the "receive" loop does: simply blocking reads on the TCP socket, passing any diff --git a/src/synchronizer.rs b/src/synchronizer.rs index 4e664f5..3cfe57d 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -2,10 +2,17 @@ use errors::*; use network_msgs::*; use bitfield::*; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; use protocol::{DatNetMessage, DatConnection}; +use rand::{OsRng, Rng}; use sleep_register::HyperRegister; -use peer::DatPeer; +use peer::{DatPeerThread, PeerMsg}; use sleep_register::SleepDirRegister; +use sodiumoxide::crypto::stream::Key; +use bit_vec::BitVec; +use discovery::discover_peers_dns; +use chan; pub enum SyncMode { RxMax, @@ -14,22 +21,112 @@ pub enum SyncMode { RxTxEndless, } +pub struct RegisterStatus { + id: u8, + register: SleepDirRegister, + inflight: Vec, + wanted: BitVec, + key: Key, +} + pub struct Synchronizer { - peers: Vec, - registers: Vec, + peers: HashMap, + registers: Vec, mode: SyncMode, - wanted: Bitfield, - inflight: Vec>, + local_id: [u8; 32], + dir: Option, + unified_peers_tx: chan::Sender>, + unified_peers_rx: chan::Receiver>, } impl Synchronizer { - pub fn next_wanted(&mut self, reg: u64) -> Option<(u64, u64)> { - // XXX - None + pub fn new_downloader(key: Key, mode: SyncMode, dir: &Path) -> Result { + + let mut metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?; + + let (unified_peers_tx, unified_peers_rx) = chan::async(); + + let metadata_status = RegisterStatus { + id: 0, + register: metadata_reg, + inflight: vec![], + wanted: BitVec::new(), + key, + }; + + let mut rng = OsRng::new()?; + let mut local_id = [0; 32]; + rng.fill_bytes(&mut local_id); + + let s = Synchronizer { + peers: HashMap::new(), + mode, + local_id, + dir: Some(dir.to_path_buf()), + registers: vec![metadata_status], + unified_peers_tx, + unified_peers_rx, + }; + Ok(s) + } + + pub fn run(&mut self) -> Result<()> { + + let meta_key = &self.registers[0].key; + let peers = discover_peers_dns(&meta_key[0..32])?; + let mut rng = OsRng::new()?; + for p in peers { + let handle = rng.gen::(); + let pt = DatPeerThread::connect(p, meta_key.clone(), handle, false, Some(&self.local_id), self.unified_peers_tx.clone())?; + self.peers.insert(handle, pt); + // TODO: send a large want? or wait for haves? + }; + + // bug in chan_select!() breaking `self` reference? + let unified_peers_rx = self.unified_peers_rx.clone(); + + loop { + chan_select! { + unified_peers_rx.recv() -> val => { }, + }; + } } - pub fn tick(&mut self) -> Result<()> { + /* + match val { + Some(Ok(pm)) => { + self.handle_msg(&pm); + }, + Some(Err(e)) => { + // TODO: don't bail here + bail!("Got a client error: {}", e); + }, + _ => { unimplemented!() }, + }; + */ + + fn handle_msg(&mut self, pm: &PeerMsg) -> Result<()> { + + match &pm.msg { + &DatNetMessage::Feed(_) => { unimplemented!() }, + &DatNetMessage::Handshake(_) => { unimplemented!() }, + &DatNetMessage::Info(_) => { unimplemented!() }, + &DatNetMessage::Have(ref have) => { + // TODO: bulk-add haves to peer status + }, + &DatNetMessage::Unhave(_) => {}, // PASS + &DatNetMessage::Want(_) => {}, // PASS + &DatNetMessage::Unwant(_) => {}, // PASS + &DatNetMessage::Request(_) => {}, // PASS + &DatNetMessage::Cancel(_) => {}, // PASS + &DatNetMessage::Data(_) => { + if pm.feed_index as usize >= self.registers.len() { + // TODO: invalid feed! drop connection + } + // TODO: insert into feed + }, + } Ok(()) } } @@ -72,7 +169,8 @@ pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegist bail!("Register already had data in it (expected empty for naive clone)"); } - let mut dc = DatConnection::connect(host_port, key, false)?; + let key = Key::from_slice(key).unwrap(); + let mut dc = DatConnection::connect(host_port, &key, false, None)?; // Info: downloading, not uploading let mut im = Info::new(); -- cgit v1.2.3