aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2017-10-20 20:05:20 -0700
committerBryan Newbold <bnewbold@robocracy.org>2017-10-20 20:05:20 -0700
commit1940f7b082bfd958ae9e4155f62afbd8f0100fd8 (patch)
treecbff21956497120a1a655a515f9d098d7ec46ebd
parent0cd766546ae13d4d543c3da7f5f02d2720a8e2f7 (diff)
downloadgeniza-1940f7b082bfd958ae9e4155f62afbd8f0100fd8.tar.gz
geniza-1940f7b082bfd958ae9e4155f62afbd8f0100fd8.zip
crude WIP net client
-rw-r--r--src/bin/geniza-net.rs51
-rw-r--r--src/lib.rs12
-rw-r--r--src/sync.rs189
3 files changed, 249 insertions, 3 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs
new file mode 100644
index 0000000..0f413a8
--- /dev/null
+++ b/src/bin/geniza-net.rs
@@ -0,0 +1,51 @@
+
+#[macro_use]
+extern crate error_chain;
+extern crate clap;
+extern crate geniza;
+
+// TODO: more careful import
+use geniza::*;
+use clap::{App, SubCommand};
+
+fn run() -> Result<()> {
+
+ let matches = App::new("geniza-net")
+ .version(env!("CARGO_PKG_VERSION"))
+ .subcommand(SubCommand::with_name("connect")
+ .about("Connects to a peer and exchanges handshake")
+ .arg_from_usage("<host_port> 'peer host:port to connect to'")
+ .arg_from_usage("<dat_key> 'dat key (public key) to register with'"))
+ .get_matches();
+
+
+ match matches.subcommand() {
+ ("connect", Some(subm)) => {
+ let host_port = subm.value_of("host_port").unwrap();
+ let dat_key = subm.value_of("dat_key").unwrap();
+ if dat_key.len() != 32*2 {
+ bail!("dat key not correct length");
+ }
+ let mut key_bytes = vec![];
+ for i in 0..32 {
+ let r = u8::from_str_radix(&dat_key[2*i .. 2*i+2], 16);
+ match r {
+ Ok(b) => key_bytes.push(b),
+ Err(e) => bail!("Problem with hex: {}", e),
+ };
+ }
+ DatConnection::connect(
+ host_port,
+ &key_bytes)?;
+ println!("Done!");
+ },
+ _ => {
+ println!("Missing or unimplemented command!");
+ println!("{}", matches.usage());
+ ::std::process::exit(-1);
+ },
+ }
+ Ok(())
+}
+
+quick_main!(run);
diff --git a/src/lib.rs b/src/lib.rs
index 0be23a1..8a515d7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -13,6 +13,8 @@
#[macro_use]
extern crate error_chain;
+extern crate log;
+extern crate env_logger;
extern crate integer_encoding;
extern crate crypto;
extern crate rand;
@@ -23,10 +25,12 @@ extern crate tempdir;
#[allow(unused_doc_comment)]
mod errors {
+
// Create the Error, ErrorKind, ResultExt, and Result types
error_chain! {
foreign_links { Fmt(::std::fmt::Error);
- Io(::std::io::Error) #[cfg(unix)]; }
+ Io(::std::io::Error) #[cfg(unix)];
+ Protobuf(::protobuf::ProtobufError); }
}
}
@@ -35,8 +39,10 @@ pub use errors::*;
// Organize code internally (files, modules), but pull it all into a flat namespace to export.
mod sleep;
+pub use sleep::*;
mod register;
+pub use register::*;
+mod sync;
+pub use sync::*;
pub mod network_proto;
pub mod drive_proto;
-pub use sleep::*;
-pub use register::*;
diff --git a/src/sync.rs b/src/sync.rs
new file mode 100644
index 0000000..f89f2bb
--- /dev/null
+++ b/src/sync.rs
@@ -0,0 +1,189 @@
+
+use std::io::Read;
+use std::net::TcpStream;
+use integer_encoding::{VarIntReader, VarIntWriter, VarInt};
+use std::time::Duration;
+use crypto::digest::Digest;
+use crypto::blake2b::Blake2b;
+use rand::{Rng, OsRng};
+use protobuf::Message;
+use protobuf::core::parse_from_reader;
+use protobuf::core::parse_from_bytes;
+
+use errors::*;
+use network_proto::*;
+
+// TCP stream
+// two varints
+
+pub enum DatNetMessage {
+ Register(Feed),
+ Handshake(Handshake),
+ Status(Info),
+ Have(Have),
+ Unhave(Unhave),
+ Want(Want),
+ Unwant(Unwant),
+ Request(Request),
+ Cancel(Cancel),
+ Data(Data),
+}
+
+fn msg_code(msg: &DatNetMessage) -> u8 {
+ match msg {
+ &DatNetMessage::Register(_) => 0,
+ &DatNetMessage::Handshake(_) => 1,
+ &DatNetMessage::Status(_) => 2,
+ &DatNetMessage::Have(_) => 3,
+ &DatNetMessage::Unhave(_) => 4,
+ &DatNetMessage::Want(_) => 5,
+ &DatNetMessage::Unwant(_) => 6,
+ &DatNetMessage::Request(_) => 7,
+ &DatNetMessage::Cancel(_) => 8,
+ &DatNetMessage::Data(_) => 9,
+ }
+}
+
+fn msg_sugar(msg: &DatNetMessage) -> &Message {
+ match msg {
+ &DatNetMessage::Register(ref m) => m,
+ &DatNetMessage::Handshake(ref m) => m,
+ &DatNetMessage::Status(ref m) => m,
+ &DatNetMessage::Have(ref m) => m,
+ &DatNetMessage::Unhave(ref m) => m,
+ &DatNetMessage::Want(ref m) => m,
+ &DatNetMessage::Unwant(ref m) => m,
+ &DatNetMessage::Request(ref m) => m,
+ &DatNetMessage::Cancel(ref m) => m,
+ &DatNetMessage::Data(ref m) => m,
+ }
+}
+
+/// Represents a bi-directional connection to a network peer
+///
+/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes.
+pub struct DatConnection {
+ nonce: [u8; 24],
+ remote_nonce: [u8; 24],
+ id: [u8; 32],
+ stream: TcpStream,
+}
+
+impl DatConnection {
+
+ pub fn connect(host_port: &str, key: &[u8]) -> Result<DatConnection> {
+
+ let timeout = Duration::new(7, 0);
+ let mut rng = OsRng::new()?;
+ let mut nonce = [0; 24];
+ rng.fill_bytes(&mut nonce);
+ let mut local_id = [0; 32];
+ rng.fill_bytes(&mut local_id);
+
+ // Connect to server
+ println!("Connecting to {}", host_port);
+ // TODO: timeout on connect (socketaddr dance)
+ let stream = TcpStream::connect(host_port)?;
+ stream.set_read_timeout(Some(timeout))?;
+ stream.set_write_timeout(Some(timeout))?;
+
+ let mut dc = DatConnection {
+ nonce: nonce,
+ remote_nonce: [0; 24],
+ id: local_id,
+ stream,
+ };
+
+ // Exchange register/feed
+ dc.stream.set_nodelay(true)?; // Faster handshake
+ // calculate discovery key
+ let mut discovery_key = [0; 32];
+ let mut hash = Blake2b::new_keyed(32, key);
+ hash.input(&"hypercore".as_bytes());
+ hash.result(&mut discovery_key);
+ // send register
+ let mut register_msg = Feed::new();
+ register_msg.set_discoveryKey(discovery_key.to_vec());
+ register_msg.set_nonce(nonce.to_vec());
+ dc.send_msg(false, &DatNetMessage::Register(register_msg))?;
+
+ // read register
+ let (was_content, msg) = dc.recv_msg()?;
+ if was_content {
+ bail!("Expected metadata msg, not content");
+ }
+ if let DatNetMessage::Register(registration) = msg {
+ if registration.get_discoveryKey()[0..32] != discovery_key[..] {
+ bail!("Remote peer not sharing same discovery key");
+ }
+ // TODO: more satisfying way to do this transfer
+ let rn = registration.get_nonce();
+ for i in 0..24 {
+ dc.remote_nonce[i] = rn[i];
+ }
+ } else {
+ bail!("Expected Registration message, got something else");
+ }
+
+ // send handshake
+ // read handshake
+
+ dc.stream.set_nodelay(false)?; // Back to normal
+
+ Ok(dc)
+ }
+
+ fn send_msg(&mut self, is_content: bool, dnm: &DatNetMessage) -> Result<()> {
+
+ let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F);
+ let msg: &Message = msg_sugar(dnm);
+ let total_message_size = (msg.compute_size() as usize) + header_int.required_space();
+
+ println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content);
+
+ // send both header varints, and data
+ self.stream.write_varint(total_message_size)?;
+ self.stream.write_varint(header_int)?;
+ msg.write_to_writer(&mut self.stream)?;
+
+ Ok(())
+ }
+
+ fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> {
+ let total_len: u64 = self.stream.read_varint()?;
+ let header: u8 = self.stream.read_varint()?;
+
+ let is_content = (header & (1 << 4)) != 0;
+
+ println!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
+
+ // XXX: replace with coded, buffered streams
+ let mut buf = [0; 1024];
+ let msg_len = (total_len - 1) as usize;
+ let len = self.stream.read(&mut buf[0..msg_len])?;
+ println!("raw read: {} first={}", len, buf[0]);
+
+ let dnm = match header & 0x0F {
+ 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf[0..msg_len])?),
+ 1 => DatNetMessage::Handshake(parse_from_reader::<Handshake>(&mut self.stream)?),
+ 2 => DatNetMessage::Status(parse_from_reader::<Info>(&mut self.stream)?),
+ 3 => DatNetMessage::Have(parse_from_reader::<Have>(&mut self.stream)?),
+ 4 => DatNetMessage::Unhave(parse_from_reader::<Unhave>(&mut self.stream)?),
+ 5 => DatNetMessage::Want(parse_from_reader::<Want>(&mut self.stream)?),
+ 6 => DatNetMessage::Unwant(parse_from_reader::<Unwant>(&mut self.stream)?),
+ 7 => DatNetMessage::Request(parse_from_reader::<Request>(&mut self.stream)?),
+ 8 => DatNetMessage::Cancel(parse_from_reader::<Cancel>(&mut self.stream)?),
+ 9 => DatNetMessage::Data(parse_from_reader::<Data>(&mut self.stream)?),
+ _ => bail!("Unimplemented message type received"),
+ };
+ Ok((is_content, dnm))
+ }
+
+ fn receive_all(&self) -> Result<()> {
+
+ // Status: downloading, not uploading
+ // Have: nothing
+ // Want: everything
+ unimplemented!();
+ }
+}