aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/geniza.rs19
-rw-r--r--src/peer.rs32
-rw-r--r--src/protocol.rs6
-rw-r--r--src/synchronizer.rs5
4 files changed, 45 insertions, 17 deletions
diff --git a/src/bin/geniza.rs b/src/bin/geniza.rs
index f5c73b0..72a7f8f 100644
--- a/src/bin/geniza.rs
+++ b/src/bin/geniza.rs
@@ -6,12 +6,14 @@ extern crate env_logger;
#[macro_use]
extern crate error_chain;
extern crate geniza;
+extern crate sodiumoxide;
// TODO: more careful import
use geniza::*;
use std::path::{Path, PathBuf};
use clap::{App, SubCommand};
use std::env::current_dir;
+use sodiumoxide::crypto::stream::Key;
// Helper to find a dat directory somewhere in the parent to the current working directory (or None
@@ -89,12 +91,17 @@ fn run() -> Result<()> {
match matches.subcommand() {
("clone", Some(subm)) => {
- let dat_key = subm.value_of("dat_key").unwrap();
- let _key_bytes = parse_dat_address(&dat_key)?;
- unimplemented!();
- //let dir = Path::new(subm.value_of("dat-dir").unwrap());
- //let mut metadata = SleepDirRegister::create(&dir, "metadata")?;
- //node_simple_clone(host_port, &key_bytes, &mut metadata, false)?;
+ let dat_key = subm.value_of("address").unwrap();
+ let key_bytes = parse_dat_address(&dat_key)?;
+ let key = Key::from_slice(&key_bytes).unwrap();
+ let dir = Path::new(subm.value_of("dir").unwrap());
+
+ let mut sync = Synchronizer::new_downloader(key,
+ SyncMode::RxMax,
+ dir)?;
+ let peer_count = sync.discover()?;
+ println!("Found {} potential peers", peer_count);
+ sync.run()?;
}
("init", Some(subm)) => {
let _dir = Path::new(subm.value_of("dir").unwrap());
diff --git a/src/peer.rs b/src/peer.rs
index b4fddef..a020a1a 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -71,14 +71,30 @@ fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiv
};
},
raw_peer_rx.recv() -> val => {
- if let Some(Ok((msg, feed_index))) = val {
- // do mapping between feed index and feed pubkey here
- let pm = PeerMsg {
- peer_handle: handle,
- feed_index: feed_index,
- msg,
- };
- unified_chan.send(Ok(pm));
+ match val {
+ Some(Ok((msg, feed_index))) => {
+ // do mapping between feed index and feed pubkey here
+ let pm = PeerMsg {
+ peer_handle: handle,
+ feed_index: feed_index,
+ msg,
+ };
+ unified_chan.send(Ok(pm));
+ },
+ Some(Err(err)) => {
+ println!("remote socket error: {:?}", err);
+ // XXX: Need to send something so we know to close
+ unified_chan.send(Err(err));
+ dc.close();
+ return;
+ },
+ None => {
+ println!("remote socket closed");
+ // XXX: Need to send something so we know to close
+ //unified_chan.send(Err(err));
+ dc.close();
+ return;
+ }
};
}
};
diff --git a/src/protocol.rs b/src/protocol.rs
index 6e70dbb..35681e1 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -1,5 +1,5 @@
-use std::net::{TcpStream, ToSocketAddrs};
+use std::net::{TcpStream, ToSocketAddrs, Shutdown};
use std::time::Duration;
use std::io::{Read, Write};
use std::cmp;
@@ -395,4 +395,8 @@ impl DatConnection {
trace!("\twas: {:?}", reg);
Ok(reg)
}
+
+ pub fn close(&mut self) {
+ self.tcp.shutdown(Shutdown::Both);
+ }
}
diff --git a/src/synchronizer.rs b/src/synchronizer.rs
index 277bf64..cfc4cd3 100644
--- a/src/synchronizer.rs
+++ b/src/synchronizer.rs
@@ -136,14 +136,15 @@ impl Synchronizer {
}
fn handle_msg(&mut self, pm: &PeerMsg) -> Result<()> {
+ // NB: this is the simplistic model of registers (only works up to 2x per peer)
// mutable ref to PeerThread for this message
let pt = self.peers.get_mut(&pm.peer_handle).unwrap();
// NB: this is the simplistic model of registers (only works up to 2x per peer?)
if pm.feed_index as usize >= self.registers.len() {
- // XXX: invalid feed! drop connection
- pt.close()?;
+ // Ignore feed channels we haven't registered yet
+ return Ok(());
}
match &pm.msg {