From dd51907e1228afbe032c61fe8bcbdd4de4497a98 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 15 Feb 2018 19:00:45 -0800 Subject: more WIP on synchronizer --- src/synchronizer.rs | 141 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 118 insertions(+), 23 deletions(-) (limited to 'src/synchronizer.rs') diff --git a/src/synchronizer.rs b/src/synchronizer.rs index 3cfe57d..2327600 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -12,6 +12,9 @@ use sleep_register::SleepDirRegister; use sodiumoxide::crypto::stream::Key; use bit_vec::BitVec; use discovery::discover_peers_dns; +use protobuf::parse_from_bytes; +use network_msgs::Data; +use metadata_msgs::Index; use chan; pub enum SyncMode { @@ -33,6 +36,7 @@ pub struct Synchronizer { peers: HashMap, registers: Vec, mode: SyncMode, + is_drive: bool, local_id: [u8; 32], dir: Option, unified_peers_tx: chan::Sender>, @@ -43,7 +47,7 @@ impl Synchronizer { pub fn new_downloader(key: Key, mode: SyncMode, dir: &Path) -> Result { - let mut metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?; + let metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?; let (unified_peers_tx, unified_peers_rx) = chan::async(); @@ -63,6 +67,7 @@ impl Synchronizer { peers: HashMap::new(), mode, local_id, + is_drive: true, dir: Some(dir.to_path_buf()), registers: vec![metadata_status], unified_peers_tx, @@ -73,64 +78,123 @@ impl Synchronizer { pub fn run(&mut self) -> Result<()> { - let meta_key = &self.registers[0].key; + let meta_key = &self.registers.get(0).unwrap().key.clone(); let peers = discover_peers_dns(&meta_key[0..32])?; let mut rng = OsRng::new()?; for p in peers { let handle = rng.gen::(); let pt = DatPeerThread::connect(p, meta_key.clone(), handle, false, Some(&self.local_id), self.unified_peers_tx.clone())?; self.peers.insert(handle, pt); - // TODO: send a large want? or wait for haves? + let pt = self.peers.get_mut(&handle).unwrap(); + + match self.mode { + SyncMode::RxMax => { + init_want_everything(pt, 0)?; + }, + SyncMode::RxEndless => unimplemented!(), + SyncMode::TxEndless => unimplemented!(), + SyncMode::RxTxEndless => unimplemented!(), + }; }; // bug in chan_select!() breaking `self` reference? + // "recursion limit reached while expanding the macro `chan_select`" let unified_peers_rx = self.unified_peers_rx.clone(); loop { chan_select! { - unified_peers_rx.recv() -> val => { }, + unified_peers_rx.recv() -> val => { + if let Some(Ok(pm)) = val { + self.handle_msg(&pm)?; + } + }, }; } } - /* - match val { - Some(Ok(pm)) => { - self.handle_msg(&pm); - }, - Some(Err(e)) => { - // TODO: don't bail here - bail!("Got a client error: {}", e); - }, - _ => { unimplemented!() }, - }; - */ - fn handle_msg(&mut self, pm: &PeerMsg) -> Result<()> { + // mutable ref to PeerThread for this message + let pt = self.peers.get_mut(&pm.peer_handle).unwrap(); + + // NB: this is the simplistic model of registers (only works up to 2x per peer?) + if pm.feed_index as usize >= self.registers.len() { + // XXX: invalid feed! drop connection + pt.close()?; + } + match &pm.msg { &DatNetMessage::Feed(_) => { unimplemented!() }, &DatNetMessage::Handshake(_) => { unimplemented!() }, &DatNetMessage::Info(_) => { unimplemented!() }, - &DatNetMessage::Have(ref have) => { - // TODO: bulk-add haves to peer status + &DatNetMessage::Have(ref msg) => { + // TODO: depending on mode... + + //let peer_has = extract_bitfield(msg)?; + // TODO: remove bits we already have + // TODO: depending on mode, extend 'wanted' bits + // TODO: send a Request on this channel + // XXX: dummy for testing + let mut request = Request::new(); + request.set_index(msg.get_start()); + pt.send(DatNetMessage::Request(request), pm.feed_index)?; }, &DatNetMessage::Unhave(_) => {}, // PASS &DatNetMessage::Want(_) => {}, // PASS &DatNetMessage::Unwant(_) => {}, // PASS &DatNetMessage::Request(_) => {}, // PASS &DatNetMessage::Cancel(_) => {}, // PASS - &DatNetMessage::Data(_) => { - if pm.feed_index as usize >= self.registers.len() { - // TODO: invalid feed! drop connection + &DatNetMessage::Data(ref msg) => { + + // TODO: feed indexing? + // Insert into local feed + // XXX self.registers[pm.feed_index].insert(msg); + + // If a drive, and this is the first entry of metadata feed, it has the config for + // the content feed + if self.is_drive && pm.feed_index == 0 && msg.get_index() == 0 { + let data_key = parse_drive_data_key(msg)?; + pt.add_feed(&data_key)?; + init_want_everything(pt, 1)?; + + // If we haven't already, create and save a local register + if self.registers.len() < 2 { + + let dir = self.dir.clone().unwrap(); + let content_reg = SleepDirRegister::create(&dir, "content")?; + + let content_status = RegisterStatus { + id: 1, + register: content_reg, + inflight: vec![], + wanted: BitVec::new(), + key: data_key, + }; + + self.registers.push(content_status); + } } - // TODO: insert into feed + + // TODO: send next wanted, or otherwise update state }, } Ok(()) } } +fn parse_drive_data_key(data_msg: &Data) -> Result { + let index_msg = parse_from_bytes::(&mut data_msg.get_value())?; + if index_msg.get_field_type() == "hyperdrive" { + let data_key = index_msg.get_content(); + if data_key.len() != 32 { + bail!("Received data key had wrong length: {}", data_key.len()); + } + return Ok(Key::from_slice(&data_key[0..32]).unwrap()); + } else { + bail!("non-hyperdrive Index type: {}", index_msg.get_field_type()); + } +} + fn max_index(have_msg: &Have) -> Result { if have_msg.has_length() { @@ -162,6 +226,37 @@ fn test_max_index() { assert_eq!(max_index(&hm).unwrap(), 5); } +fn init_want_everything(dpt: &mut DatPeerThread, reg_index: u8) -> Result<()> { + + // Info: downloading, not uploading + let mut im = Info::new(); + im.set_uploading(false); + im.set_downloading(true); + let im = DatNetMessage::Info(im); + dpt.send(im, reg_index)?; + + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + let hm = DatNetMessage::Have(hm); + dpt.send(hm, reg_index)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + let uhm = DatNetMessage::Unhave(uhm); + dpt.send(uhm, reg_index)?; + + // Want: everything + let mut wm = Want::new(); + wm.set_start(0); + let wm = DatNetMessage::Want(wm); + dpt.send(wm, reg_index)?; + + Ok(()) +} + /// Tries to connect to a single peer, pull register, and close. pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, reg_index: u8) -> Result<()> { -- cgit v1.2.3