From 375836cf6322bc31828c5e2be47e5f0aa5f99099 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 21 Jan 2018 23:47:14 -0800 Subject: procol cleanups: nodelay, from_tcp, tosocketaddr --- src/bin/geniza-net.rs | 17 ----------- src/protocol.rs | 78 +++++++++++---------------------------------------- 2 files changed, 17 insertions(+), 78 deletions(-) diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs index 99ebdb5..4b3fa30 100644 --- a/src/bin/geniza-net.rs +++ b/src/bin/geniza-net.rs @@ -23,13 +23,6 @@ fn run() -> Result<()> { .arg_from_usage(" 'peer host:port to connect to'") .arg_from_usage(" 'dat key (public key) to register with'"), ) - .subcommand( - SubCommand::with_name("receive-some") - .about("Connects to a peer, pulls some metadata and content") - .arg_from_usage(" 'peer host:port to connect to'") - .arg_from_usage(" 'dat key (public key) to register with'") - .arg_from_usage(" 'how many entries to pull'"), - ) .subcommand( SubCommand::with_name("discovery-key") .about("Prints (in hex) the discovery key for a dat archive") @@ -69,16 +62,6 @@ fn run() -> Result<()> { DatConnection::connect(host_port, &key_bytes, false)?; println!("Done!"); } - ("receive-some", Some(subm)) => { - let host_port = subm.value_of("host_port").unwrap(); - let dat_key = subm.value_of("dat_key").unwrap(); - let count: u64 = subm.value_of("count").unwrap().parse().unwrap(); - let key_bytes = parse_dat_address(&dat_key)?; - let mut dc = DatConnection::connect(host_port, &key_bytes, false)?; - dc.receive_some(0, count)?; - dc.receive_some(1, count)?; - println!("Done!"); - } ("discovery-key", Some(subm)) => { let dat_key = subm.value_of("dat_key").unwrap(); let key_bytes = parse_dat_address(&dat_key)?; diff --git a/src/protocol.rs b/src/protocol.rs index 79e1f1c..ee72b4d 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,8 +1,9 @@ -use std::net::TcpStream; +use std::net::{TcpStream, ToSocketAddrs}; use std::time::Duration; use std::io::{Read, Write}; use std::cmp; +use std::fmt::Display; use sodiumoxide::crypto::stream::*; use rand::{OsRng, Rng}; use protobuf::Message; @@ -135,7 +136,7 @@ fn test_bsxii_continued() { pub struct DatConnection { pub id: [u8; 32], remote_id: [u8; 32], - tcp: TcpStream, + pub tcp: TcpStream, pub live: bool, pub key: Key, pub discovery_key: [u8; 32], @@ -176,8 +177,19 @@ impl Write for DatConnection { } impl DatConnection { - pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result { - let timeout = Duration::new(7, 0); + pub fn connect(addr: A, key: &[u8], live: bool) -> Result { + + // Connect to server + info!("Connecting to {}", addr); + // TODO: timeout on connect (socketaddr iterator dance) + let tcp = TcpStream::connect(addr)?; + + DatConnection::from_tcp(tcp, key, live) + } + + // It's sort of a hack, but this should be usable from an accept() as well as a connect() + pub fn from_tcp(tcp: TcpStream, key: &[u8], live: bool) -> Result { + let tx_nonce = gen_nonce(); let mut local_id = [0; 32]; let mut rng = OsRng::new()?; @@ -186,11 +198,7 @@ impl DatConnection { let mut dk = [0; 32]; dk.copy_from_slice(&make_discovery_key(key)[0..32]); - // Connect to server - info!("Connecting to {}", host_port); - // TODO: timeout on connect (socketaddr dance) - let tcp = TcpStream::connect(host_port)?; - tcp.set_read_timeout(Some(timeout))?; + let timeout = Duration::new(7, 0); tcp.set_write_timeout(Some(timeout))?; let mut dc = DatConnection { @@ -207,7 +215,6 @@ impl DatConnection { }; // Exchange feed - dc.tcp.set_nodelay(true)?; // Faster handshake let mut feed_msg = Feed::new(); feed_msg.set_discoveryKey(dc.discovery_key.to_vec()); feed_msg.set_nonce((tx_nonce[0..24]).to_vec()); @@ -242,8 +249,6 @@ impl DatConnection { bail!("Expected Handshake message, got something else"); } - dc.tcp.set_nodelay(false)?; // Back to normal - Ok(dc) } @@ -362,53 +367,4 @@ impl DatConnection { trace!("\twas: {:?}", reg); Ok(reg) } - - /// This is a debug/dev helper, will be deleted - pub fn receive_some(&mut self, feed_index: u8, length: u64) -> Result<()> { - // Info: downloading, not uploading - let mut im = Info::new(); - im.set_uploading(false); - im.set_downloading(true); - self.send_msg(&DatNetMessage::Info(im), feed_index)?; - - // Have: nothing (so far) - let mut hm = Have::new(); - hm.set_start(0); - hm.set_length(0); - self.send_msg(&DatNetMessage::Have(hm), feed_index)?; - - // UnHave: still nothing - let mut uhm = Unhave::new(); - uhm.set_start(0); - self.send_msg(&DatNetMessage::Unhave(uhm), feed_index)?; - - // Want: everything - let mut wm = Want::new(); - wm.set_start(0); - self.send_msg(&DatNetMessage::Want(wm), feed_index)?; - - // Request / Data loop - for i in 0..length { - let mut rm = Request::new(); - rm.set_index(i); - self.send_msg(&DatNetMessage::Request(rm), feed_index)?; - - loop { - let (msg, rx_index) = self.recv_msg()?; - if rx_index != feed_index { - info!("Expected other message channel"); - continue; - } - if let DatNetMessage::Data(dm) = msg { - info!("Got content: {}", dm.get_index()); - break; - } else { - info!("Expected Data message, got: {:?}", &msg); - continue; - } - } - } - - Ok(()) - } } -- cgit v1.2.3