aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/bin/geniza-net.rs17
-rw-r--r--src/protocol.rs78
2 files changed, 17 insertions, 78 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs
index 99ebdb5..4b3fa30 100644
--- a/src/bin/geniza-net.rs
+++ b/src/bin/geniza-net.rs
@@ -24,13 +24,6 @@ fn run() -> Result<()> {
.arg_from_usage("<dat_key> 'dat key (public key) to register with'"),
)
.subcommand(
- SubCommand::with_name("receive-some")
- .about("Connects to a peer, pulls some metadata and content")
- .arg_from_usage("<host_port> 'peer host:port to connect to'")
- .arg_from_usage("<dat_key> 'dat key (public key) to register with'")
- .arg_from_usage("<count> 'how many entries to pull'"),
- )
- .subcommand(
SubCommand::with_name("discovery-key")
.about("Prints (in hex) the discovery key for a dat archive")
.arg_from_usage("<dat_key> 'dat key (public key) to convert (in hex)'"),
@@ -69,16 +62,6 @@ fn run() -> Result<()> {
DatConnection::connect(host_port, &key_bytes, false)?;
println!("Done!");
}
- ("receive-some", Some(subm)) => {
- let host_port = subm.value_of("host_port").unwrap();
- let dat_key = subm.value_of("dat_key").unwrap();
- let count: u64 = subm.value_of("count").unwrap().parse().unwrap();
- let key_bytes = parse_dat_address(&dat_key)?;
- let mut dc = DatConnection::connect(host_port, &key_bytes, false)?;
- dc.receive_some(0, count)?;
- dc.receive_some(1, count)?;
- println!("Done!");
- }
("discovery-key", Some(subm)) => {
let dat_key = subm.value_of("dat_key").unwrap();
let key_bytes = parse_dat_address(&dat_key)?;
diff --git a/src/protocol.rs b/src/protocol.rs
index 79e1f1c..ee72b4d 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -1,8 +1,9 @@
-use std::net::TcpStream;
+use std::net::{TcpStream, ToSocketAddrs};
use std::time::Duration;
use std::io::{Read, Write};
use std::cmp;
+use std::fmt::Display;
use sodiumoxide::crypto::stream::*;
use rand::{OsRng, Rng};
use protobuf::Message;
@@ -135,7 +136,7 @@ fn test_bsxii_continued() {
pub struct DatConnection {
pub id: [u8; 32],
remote_id: [u8; 32],
- tcp: TcpStream,
+ pub tcp: TcpStream,
pub live: bool,
pub key: Key,
pub discovery_key: [u8; 32],
@@ -176,8 +177,19 @@ impl Write for DatConnection {
}
impl DatConnection {
- pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> {
- let timeout = Duration::new(7, 0);
+ pub fn connect<A: ToSocketAddrs + Display>(addr: A, key: &[u8], live: bool) -> Result<DatConnection> {
+
+ // Connect to server
+ info!("Connecting to {}", addr);
+ // TODO: timeout on connect (socketaddr iterator dance)
+ let tcp = TcpStream::connect(addr)?;
+
+ DatConnection::from_tcp(tcp, key, live)
+ }
+
+ // It's sort of a hack, but this should be usable from an accept() as well as a connect()
+ pub fn from_tcp(tcp: TcpStream, key: &[u8], live: bool) -> Result<DatConnection> {
+
let tx_nonce = gen_nonce();
let mut local_id = [0; 32];
let mut rng = OsRng::new()?;
@@ -186,11 +198,7 @@ impl DatConnection {
let mut dk = [0; 32];
dk.copy_from_slice(&make_discovery_key(key)[0..32]);
- // Connect to server
- info!("Connecting to {}", host_port);
- // TODO: timeout on connect (socketaddr dance)
- let tcp = TcpStream::connect(host_port)?;
- tcp.set_read_timeout(Some(timeout))?;
+ let timeout = Duration::new(7, 0);
tcp.set_write_timeout(Some(timeout))?;
let mut dc = DatConnection {
@@ -207,7 +215,6 @@ impl DatConnection {
};
// Exchange feed
- dc.tcp.set_nodelay(true)?; // Faster handshake
let mut feed_msg = Feed::new();
feed_msg.set_discoveryKey(dc.discovery_key.to_vec());
feed_msg.set_nonce((tx_nonce[0..24]).to_vec());
@@ -242,8 +249,6 @@ impl DatConnection {
bail!("Expected Handshake message, got something else");
}
- dc.tcp.set_nodelay(false)?; // Back to normal
-
Ok(dc)
}
@@ -362,53 +367,4 @@ impl DatConnection {
trace!("\twas: {:?}", reg);
Ok(reg)
}
-
- /// This is a debug/dev helper, will be deleted
- pub fn receive_some(&mut self, feed_index: u8, length: u64) -> Result<()> {
- // Info: downloading, not uploading
- let mut im = Info::new();
- im.set_uploading(false);
- im.set_downloading(true);
- self.send_msg(&DatNetMessage::Info(im), feed_index)?;
-
- // Have: nothing (so far)
- let mut hm = Have::new();
- hm.set_start(0);
- hm.set_length(0);
- self.send_msg(&DatNetMessage::Have(hm), feed_index)?;
-
- // UnHave: still nothing
- let mut uhm = Unhave::new();
- uhm.set_start(0);
- self.send_msg(&DatNetMessage::Unhave(uhm), feed_index)?;
-
- // Want: everything
- let mut wm = Want::new();
- wm.set_start(0);
- self.send_msg(&DatNetMessage::Want(wm), feed_index)?;
-
- // Request / Data loop
- for i in 0..length {
- let mut rm = Request::new();
- rm.set_index(i);
- self.send_msg(&DatNetMessage::Request(rm), feed_index)?;
-
- loop {
- let (msg, rx_index) = self.recv_msg()?;
- if rx_index != feed_index {
- info!("Expected other message channel");
- continue;
- }
- if let DatNetMessage::Data(dm) = msg {
- info!("Got content: {}", dm.get_index());
- break;
- } else {
- info!("Expected Data message, got: {:?}", &msg);
- continue;
- }
- }
- }
-
- Ok(())
- }
}