From 4eb739dcab2a1a79e1e0e60feddb0d9cc0d74108 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 19 Feb 2018 20:06:10 -0800 Subject: more WIP on synchronizer --- src/bin/geniza.rs | 19 +++++++++++++------ src/peer.rs | 32 ++++++++++++++++++++++++-------- src/protocol.rs | 6 +++++- src/synchronizer.rs | 5 +++-- 4 files changed, 45 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/bin/geniza.rs b/src/bin/geniza.rs index f5c73b0..72a7f8f 100644 --- a/src/bin/geniza.rs +++ b/src/bin/geniza.rs @@ -6,12 +6,14 @@ extern crate env_logger; #[macro_use] extern crate error_chain; extern crate geniza; +extern crate sodiumoxide; // TODO: more careful import use geniza::*; use std::path::{Path, PathBuf}; use clap::{App, SubCommand}; use std::env::current_dir; +use sodiumoxide::crypto::stream::Key; // Helper to find a dat directory somewhere in the parent to the current working directory (or None @@ -89,12 +91,17 @@ fn run() -> Result<()> { match matches.subcommand() { ("clone", Some(subm)) => { - let dat_key = subm.value_of("dat_key").unwrap(); - let _key_bytes = parse_dat_address(&dat_key)?; - unimplemented!(); - //let dir = Path::new(subm.value_of("dat-dir").unwrap()); - //let mut metadata = SleepDirRegister::create(&dir, "metadata")?; - //node_simple_clone(host_port, &key_bytes, &mut metadata, false)?; + let dat_key = subm.value_of("address").unwrap(); + let key_bytes = parse_dat_address(&dat_key)?; + let key = Key::from_slice(&key_bytes).unwrap(); + let dir = Path::new(subm.value_of("dir").unwrap()); + + let mut sync = Synchronizer::new_downloader(key, + SyncMode::RxMax, + dir)?; + let peer_count = sync.discover()?; + println!("Found {} potential peers", peer_count); + sync.run()?; } ("init", Some(subm)) => { let _dir = Path::new(subm.value_of("dir").unwrap()); diff --git a/src/peer.rs b/src/peer.rs index b4fddef..a020a1a 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -71,14 +71,30 @@ fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiv }; }, raw_peer_rx.recv() -> val => { - if let Some(Ok((msg, feed_index))) = val { - // do mapping between feed index and feed pubkey here - let pm = PeerMsg { - peer_handle: handle, - feed_index: feed_index, - msg, - }; - unified_chan.send(Ok(pm)); + match val { + Some(Ok((msg, feed_index))) => { + // do mapping between feed index and feed pubkey here + let pm = PeerMsg { + peer_handle: handle, + feed_index: feed_index, + msg, + }; + unified_chan.send(Ok(pm)); + }, + Some(Err(err)) => { + println!("remote socket error: {:?}", err); + // XXX: Need to send something so we know to close + unified_chan.send(Err(err)); + dc.close(); + return; + }, + None => { + println!("remote socket closed"); + // XXX: Need to send something so we know to close + //unified_chan.send(Err(err)); + dc.close(); + return; + } }; } }; diff --git a/src/protocol.rs b/src/protocol.rs index 6e70dbb..35681e1 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,5 +1,5 @@ -use std::net::{TcpStream, ToSocketAddrs}; +use std::net::{TcpStream, ToSocketAddrs, Shutdown}; use std::time::Duration; use std::io::{Read, Write}; use std::cmp; @@ -395,4 +395,8 @@ impl DatConnection { trace!("\twas: {:?}", reg); Ok(reg) } + + pub fn close(&mut self) { + self.tcp.shutdown(Shutdown::Both); + } } diff --git a/src/synchronizer.rs b/src/synchronizer.rs index 277bf64..cfc4cd3 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -136,14 +136,15 @@ impl Synchronizer { } fn handle_msg(&mut self, pm: &PeerMsg) -> Result<()> { + // NB: this is the simplistic model of registers (only works up to 2x per peer) // mutable ref to PeerThread for this message let pt = self.peers.get_mut(&pm.peer_handle).unwrap(); // NB: this is the simplistic model of registers (only works up to 2x per peer?) if pm.feed_index as usize >= self.registers.len() { - // XXX: invalid feed! drop connection - pt.close()?; + // Ignore feed channels we haven't registered yet + return Ok(()); } match &pm.msg { -- cgit v1.2.3