diff options
-rw-r--r-- | src/bin/geniza-net.rs | 51 | ||||
-rw-r--r-- | src/lib.rs | 12 | ||||
-rw-r--r-- | src/sync.rs | 189 |
3 files changed, 249 insertions, 3 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs new file mode 100644 index 0000000..0f413a8 --- /dev/null +++ b/src/bin/geniza-net.rs @@ -0,0 +1,51 @@ + +#[macro_use] +extern crate error_chain; +extern crate clap; +extern crate geniza; + +// TODO: more careful import +use geniza::*; +use clap::{App, SubCommand}; + +fn run() -> Result<()> { + + let matches = App::new("geniza-net") + .version(env!("CARGO_PKG_VERSION")) + .subcommand(SubCommand::with_name("connect") + .about("Connects to a peer and exchanges handshake") + .arg_from_usage("<host_port> 'peer host:port to connect to'") + .arg_from_usage("<dat_key> 'dat key (public key) to register with'")) + .get_matches(); + + + match matches.subcommand() { + ("connect", Some(subm)) => { + let host_port = subm.value_of("host_port").unwrap(); + let dat_key = subm.value_of("dat_key").unwrap(); + if dat_key.len() != 32*2 { + bail!("dat key not correct length"); + } + let mut key_bytes = vec![]; + for i in 0..32 { + let r = u8::from_str_radix(&dat_key[2*i .. 2*i+2], 16); + match r { + Ok(b) => key_bytes.push(b), + Err(e) => bail!("Problem with hex: {}", e), + }; + } + DatConnection::connect( + host_port, + &key_bytes)?; + println!("Done!"); + }, + _ => { + println!("Missing or unimplemented command!"); + println!("{}", matches.usage()); + ::std::process::exit(-1); + }, + } + Ok(()) +} + +quick_main!(run); @@ -13,6 +13,8 @@ #[macro_use] extern crate error_chain; +extern crate log; +extern crate env_logger; extern crate integer_encoding; extern crate crypto; extern crate rand; @@ -23,10 +25,12 @@ extern crate tempdir; #[allow(unused_doc_comment)] mod errors { + // Create the Error, ErrorKind, ResultExt, and Result types error_chain! { foreign_links { Fmt(::std::fmt::Error); - Io(::std::io::Error) #[cfg(unix)]; } + Io(::std::io::Error) #[cfg(unix)]; + Protobuf(::protobuf::ProtobufError); } } } @@ -35,8 +39,10 @@ pub use errors::*; // Organize code internally (files, modules), but pull it all into a flat namespace to export. mod sleep; +pub use sleep::*; mod register; +pub use register::*; +mod sync; +pub use sync::*; pub mod network_proto; pub mod drive_proto; -pub use sleep::*; -pub use register::*; diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..f89f2bb --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,189 @@ + +use std::io::Read; +use std::net::TcpStream; +use integer_encoding::{VarIntReader, VarIntWriter, VarInt}; +use std::time::Duration; +use crypto::digest::Digest; +use crypto::blake2b::Blake2b; +use rand::{Rng, OsRng}; +use protobuf::Message; +use protobuf::core::parse_from_reader; +use protobuf::core::parse_from_bytes; + +use errors::*; +use network_proto::*; + +// TCP stream +// two varints + +pub enum DatNetMessage { + Register(Feed), + Handshake(Handshake), + Status(Info), + Have(Have), + Unhave(Unhave), + Want(Want), + Unwant(Unwant), + Request(Request), + Cancel(Cancel), + Data(Data), +} + +fn msg_code(msg: &DatNetMessage) -> u8 { + match msg { + &DatNetMessage::Register(_) => 0, + &DatNetMessage::Handshake(_) => 1, + &DatNetMessage::Status(_) => 2, + &DatNetMessage::Have(_) => 3, + &DatNetMessage::Unhave(_) => 4, + &DatNetMessage::Want(_) => 5, + &DatNetMessage::Unwant(_) => 6, + &DatNetMessage::Request(_) => 7, + &DatNetMessage::Cancel(_) => 8, + &DatNetMessage::Data(_) => 9, + } +} + +fn msg_sugar(msg: &DatNetMessage) -> &Message { + match msg { + &DatNetMessage::Register(ref m) => m, + &DatNetMessage::Handshake(ref m) => m, + &DatNetMessage::Status(ref m) => m, + &DatNetMessage::Have(ref m) => m, + &DatNetMessage::Unhave(ref m) => m, + &DatNetMessage::Want(ref m) => m, + &DatNetMessage::Unwant(ref m) => m, + &DatNetMessage::Request(ref m) => m, + &DatNetMessage::Cancel(ref m) => m, + &DatNetMessage::Data(ref m) => m, + } +} + +/// Represents a bi-directional connection to a network peer +/// +/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes. +pub struct DatConnection { + nonce: [u8; 24], + remote_nonce: [u8; 24], + id: [u8; 32], + stream: TcpStream, +} + +impl DatConnection { + + pub fn connect(host_port: &str, key: &[u8]) -> Result<DatConnection> { + + let timeout = Duration::new(7, 0); + let mut rng = OsRng::new()?; + let mut nonce = [0; 24]; + rng.fill_bytes(&mut nonce); + let mut local_id = [0; 32]; + rng.fill_bytes(&mut local_id); + + // Connect to server + println!("Connecting to {}", host_port); + // TODO: timeout on connect (socketaddr dance) + let stream = TcpStream::connect(host_port)?; + stream.set_read_timeout(Some(timeout))?; + stream.set_write_timeout(Some(timeout))?; + + let mut dc = DatConnection { + nonce: nonce, + remote_nonce: [0; 24], + id: local_id, + stream, + }; + + // Exchange register/feed + dc.stream.set_nodelay(true)?; // Faster handshake + // calculate discovery key + let mut discovery_key = [0; 32]; + let mut hash = Blake2b::new_keyed(32, key); + hash.input(&"hypercore".as_bytes()); + hash.result(&mut discovery_key); + // send register + let mut register_msg = Feed::new(); + register_msg.set_discoveryKey(discovery_key.to_vec()); + register_msg.set_nonce(nonce.to_vec()); + dc.send_msg(false, &DatNetMessage::Register(register_msg))?; + + // read register + let (was_content, msg) = dc.recv_msg()?; + if was_content { + bail!("Expected metadata msg, not content"); + } + if let DatNetMessage::Register(registration) = msg { + if registration.get_discoveryKey()[0..32] != discovery_key[..] { + bail!("Remote peer not sharing same discovery key"); + } + // TODO: more satisfying way to do this transfer + let rn = registration.get_nonce(); + for i in 0..24 { + dc.remote_nonce[i] = rn[i]; + } + } else { + bail!("Expected Registration message, got something else"); + } + + // send handshake + // read handshake + + dc.stream.set_nodelay(false)?; // Back to normal + + Ok(dc) + } + + fn send_msg(&mut self, is_content: bool, dnm: &DatNetMessage) -> Result<()> { + + let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F); + let msg: &Message = msg_sugar(dnm); + let total_message_size = (msg.compute_size() as usize) + header_int.required_space(); + + println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content); + + // send both header varints, and data + self.stream.write_varint(total_message_size)?; + self.stream.write_varint(header_int)?; + msg.write_to_writer(&mut self.stream)?; + + Ok(()) + } + + fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> { + let total_len: u64 = self.stream.read_varint()?; + let header: u8 = self.stream.read_varint()?; + + let is_content = (header & (1 << 4)) != 0; + + println!("RECV total_len={} header={} is_content={}", total_len, header, is_content); + + // XXX: replace with coded, buffered streams + let mut buf = [0; 1024]; + let msg_len = (total_len - 1) as usize; + let len = self.stream.read(&mut buf[0..msg_len])?; + println!("raw read: {} first={}", len, buf[0]); + + let dnm = match header & 0x0F { + 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf[0..msg_len])?), + 1 => DatNetMessage::Handshake(parse_from_reader::<Handshake>(&mut self.stream)?), + 2 => DatNetMessage::Status(parse_from_reader::<Info>(&mut self.stream)?), + 3 => DatNetMessage::Have(parse_from_reader::<Have>(&mut self.stream)?), + 4 => DatNetMessage::Unhave(parse_from_reader::<Unhave>(&mut self.stream)?), + 5 => DatNetMessage::Want(parse_from_reader::<Want>(&mut self.stream)?), + 6 => DatNetMessage::Unwant(parse_from_reader::<Unwant>(&mut self.stream)?), + 7 => DatNetMessage::Request(parse_from_reader::<Request>(&mut self.stream)?), + 8 => DatNetMessage::Cancel(parse_from_reader::<Cancel>(&mut self.stream)?), + 9 => DatNetMessage::Data(parse_from_reader::<Data>(&mut self.stream)?), + _ => bail!("Unimplemented message type received"), + }; + Ok((is_content, dnm)) + } + + fn receive_all(&self) -> Result<()> { + + // Status: downloading, not uploading + // Have: nothing + // Want: everything + unimplemented!(); + } +} |