From dd51907e1228afbe032c61fe8bcbdd4de4497a98 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 15 Feb 2018 19:00:45 -0800 Subject: more WIP on synchronizer --- TODO | 1 + src/peer.rs | 12 +++-- src/protocol.rs | 2 +- src/synchronizer.rs | 141 +++++++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 129 insertions(+), 27 deletions(-) diff --git a/TODO b/TODO index 4ac7503..281908a 100644 --- a/TODO +++ b/TODO @@ -1,6 +1,7 @@ next: - Synchronizer + => connect to specific peer (no discovery) => probably an options builder? => "wanted" bitmap => design statemachine (global and per-peer) diff --git a/src/peer.rs b/src/peer.rs index 0f04a40..b4fddef 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -33,6 +33,8 @@ fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender { + // XXX: check if this was due to socket closing cleanly, in which case don't pass + // error along peer_rx.send(Err(e)); return }, @@ -46,7 +48,7 @@ fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender, unified_chan: chan::Sender>) { - dc.tcp.set_write_timeout(Some(Duration::new(2, 0))); + dc.tcp.set_write_timeout(Some(Duration::new(2, 0))).unwrap(); let rx_dc = dc.clone(); let (receiver_chan, raw_peer_rx) = chan::async(); @@ -133,9 +135,9 @@ impl DatPeerThread { Ok(()) } - pub fn add_feed(&mut self, key: &[u8]) -> Result<()> { + pub fn add_feed(&mut self, key: &Key) -> Result<()> { - let key_bytes = key; + let key_bytes = &key[0..32]; let key = Key::from_slice(key_bytes).unwrap(); for k in self.feeds.iter() { @@ -157,5 +159,9 @@ impl DatPeerThread { self.feeds.push((index as u8, key.clone())); Ok(()) } + + pub fn close(&mut self) -> Result<()> { + unimplemented!(); + } } diff --git a/src/protocol.rs b/src/protocol.rs index a3ecda8..6e70dbb 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -210,7 +210,7 @@ impl DatConnection { let tx_nonce = gen_nonce(); let mut rng = OsRng::new()?; - let mut local_id = match local_id { + let local_id = match local_id { Some(val) => { let mut buf = [0; 32]; buf.copy_from_slice(val); diff --git a/src/synchronizer.rs b/src/synchronizer.rs index 3cfe57d..2327600 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -12,6 +12,9 @@ use sleep_register::SleepDirRegister; use sodiumoxide::crypto::stream::Key; use bit_vec::BitVec; use discovery::discover_peers_dns; +use protobuf::parse_from_bytes; +use network_msgs::Data; +use metadata_msgs::Index; use chan; pub enum SyncMode { @@ -33,6 +36,7 @@ pub struct Synchronizer { peers: HashMap, registers: Vec, mode: SyncMode, + is_drive: bool, local_id: [u8; 32], dir: Option, unified_peers_tx: chan::Sender>, @@ -43,7 +47,7 @@ impl Synchronizer { pub fn new_downloader(key: Key, mode: SyncMode, dir: &Path) -> Result { - let mut metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?; + let metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?; let (unified_peers_tx, unified_peers_rx) = chan::async(); @@ -63,6 +67,7 @@ impl Synchronizer { peers: HashMap::new(), mode, local_id, + is_drive: true, dir: Some(dir.to_path_buf()), registers: vec![metadata_status], unified_peers_tx, @@ -73,64 +78,123 @@ impl Synchronizer { pub fn run(&mut self) -> Result<()> { - let meta_key = &self.registers[0].key; + let meta_key = &self.registers.get(0).unwrap().key.clone(); let peers = discover_peers_dns(&meta_key[0..32])?; let mut rng = OsRng::new()?; for p in peers { let handle = rng.gen::(); let pt = DatPeerThread::connect(p, meta_key.clone(), handle, false, Some(&self.local_id), self.unified_peers_tx.clone())?; self.peers.insert(handle, pt); - // TODO: send a large want? or wait for haves? + let pt = self.peers.get_mut(&handle).unwrap(); + + match self.mode { + SyncMode::RxMax => { + init_want_everything(pt, 0)?; + }, + SyncMode::RxEndless => unimplemented!(), + SyncMode::TxEndless => unimplemented!(), + SyncMode::RxTxEndless => unimplemented!(), + }; }; // bug in chan_select!() breaking `self` reference? + // "recursion limit reached while expanding the macro `chan_select`" let unified_peers_rx = self.unified_peers_rx.clone(); loop { chan_select! { - unified_peers_rx.recv() -> val => { }, + unified_peers_rx.recv() -> val => { + if let Some(Ok(pm)) = val { + self.handle_msg(&pm)?; + } + }, }; } } - /* - match val { - Some(Ok(pm)) => { - self.handle_msg(&pm); - }, - Some(Err(e)) => { - // TODO: don't bail here - bail!("Got a client error: {}", e); - }, - _ => { unimplemented!() }, - }; - */ - fn handle_msg(&mut self, pm: &PeerMsg) -> Result<()> { + // mutable ref to PeerThread for this message + let pt = self.peers.get_mut(&pm.peer_handle).unwrap(); + + // NB: this is the simplistic model of registers (only works up to 2x per peer?) + if pm.feed_index as usize >= self.registers.len() { + // XXX: invalid feed! drop connection + pt.close()?; + } + match &pm.msg { &DatNetMessage::Feed(_) => { unimplemented!() }, &DatNetMessage::Handshake(_) => { unimplemented!() }, &DatNetMessage::Info(_) => { unimplemented!() }, - &DatNetMessage::Have(ref have) => { - // TODO: bulk-add haves to peer status + &DatNetMessage::Have(ref msg) => { + // TODO: depending on mode... + + //let peer_has = extract_bitfield(msg)?; + // TODO: remove bits we already have + // TODO: depending on mode, extend 'wanted' bits + // TODO: send a Request on this channel + // XXX: dummy for testing + let mut request = Request::new(); + request.set_index(msg.get_start()); + pt.send(DatNetMessage::Request(request), pm.feed_index)?; }, &DatNetMessage::Unhave(_) => {}, // PASS &DatNetMessage::Want(_) => {}, // PASS &DatNetMessage::Unwant(_) => {}, // PASS &DatNetMessage::Request(_) => {}, // PASS &DatNetMessage::Cancel(_) => {}, // PASS - &DatNetMessage::Data(_) => { - if pm.feed_index as usize >= self.registers.len() { - // TODO: invalid feed! drop connection + &DatNetMessage::Data(ref msg) => { + + // TODO: feed indexing? + // Insert into local feed + // XXX self.registers[pm.feed_index].insert(msg); + + // If a drive, and this is the first entry of metadata feed, it has the config for + // the content feed + if self.is_drive && pm.feed_index == 0 && msg.get_index() == 0 { + let data_key = parse_drive_data_key(msg)?; + pt.add_feed(&data_key)?; + init_want_everything(pt, 1)?; + + // If we haven't already, create and save a local register + if self.registers.len() < 2 { + + let dir = self.dir.clone().unwrap(); + let content_reg = SleepDirRegister::create(&dir, "content")?; + + let content_status = RegisterStatus { + id: 1, + register: content_reg, + inflight: vec![], + wanted: BitVec::new(), + key: data_key, + }; + + self.registers.push(content_status); + } } - // TODO: insert into feed + + // TODO: send next wanted, or otherwise update state }, } Ok(()) } } +fn parse_drive_data_key(data_msg: &Data) -> Result { + let index_msg = parse_from_bytes::(&mut data_msg.get_value())?; + if index_msg.get_field_type() == "hyperdrive" { + let data_key = index_msg.get_content(); + if data_key.len() != 32 { + bail!("Received data key had wrong length: {}", data_key.len()); + } + return Ok(Key::from_slice(&data_key[0..32]).unwrap()); + } else { + bail!("non-hyperdrive Index type: {}", index_msg.get_field_type()); + } +} + fn max_index(have_msg: &Have) -> Result { if have_msg.has_length() { @@ -162,6 +226,37 @@ fn test_max_index() { assert_eq!(max_index(&hm).unwrap(), 5); } +fn init_want_everything(dpt: &mut DatPeerThread, reg_index: u8) -> Result<()> { + + // Info: downloading, not uploading + let mut im = Info::new(); + im.set_uploading(false); + im.set_downloading(true); + let im = DatNetMessage::Info(im); + dpt.send(im, reg_index)?; + + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + let hm = DatNetMessage::Have(hm); + dpt.send(hm, reg_index)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + let uhm = DatNetMessage::Unhave(uhm); + dpt.send(uhm, reg_index)?; + + // Want: everything + let mut wm = Want::new(); + wm.set_start(0); + let wm = DatNetMessage::Want(wm); + dpt.send(wm, reg_index)?; + + Ok(()) +} + /// Tries to connect to a single peer, pull register, and close. pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, reg_index: u8) -> Result<()> { -- cgit v1.2.3