aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2017-10-26 00:17:01 -0700
committerBryan Newbold <bnewbold@robocracy.org>2017-10-26 00:17:01 -0700
commite8e2bb6b41e5e19f8a433dcff597369af2218a92 (patch)
treedfec4ef65234b00d7037c7bf03eb48f86d6f2962 /src
parent8e13fc65e944b9741e3f8e0f3d45542d8090a99c (diff)
downloadgeniza-e8e2bb6b41e5e19f8a433dcff597369af2218a92.tar.gz
geniza-e8e2bb6b41e5e19f8a433dcff597369af2218a92.zip
work in progress on network connection
Diffstat (limited to 'src')
-rw-r--r--src/bin/geniza-net.rs7
-rw-r--r--src/bin/geniza-register.rs3
-rw-r--r--src/bin/geniza-sleep.rs3
-rw-r--r--src/sync.rs129
4 files changed, 127 insertions, 15 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs
index de97e88..1f369ab 100644
--- a/src/bin/geniza-net.rs
+++ b/src/bin/geniza-net.rs
@@ -2,6 +2,7 @@
#[macro_use]
extern crate error_chain;
extern crate clap;
+extern crate env_logger;
extern crate geniza;
// TODO: more careful import
@@ -9,6 +10,8 @@ use geniza::*;
use clap::{App, SubCommand};
fn run() -> Result<()> {
+
+ env_logger::init().unwrap();
let matches = App::new("geniza-net")
.version(env!("CARGO_PKG_VERSION"))
@@ -16,7 +19,7 @@ fn run() -> Result<()> {
.about("Connects to a peer and exchanges handshake")
.arg_from_usage("<host_port> 'peer host:port to connect to'")
.arg_from_usage("<dat_key> 'dat key (public key) to register with'"))
- .subcommand(SubCommand::with_name("clone")
+ .subcommand(SubCommand::with_name("receive-all")
.about("Connects to a peer, pulls all metadata and content")
.arg_from_usage("<host_port> 'peer host:port to connect to'")
.arg_from_usage("<dat_key> 'dat key (public key) to register with'"))
@@ -44,7 +47,7 @@ fn run() -> Result<()> {
false)?;
println!("Done!");
},
- ("clone", Some(subm)) => {
+ ("receive-all", Some(subm)) => {
let host_port = subm.value_of("host_port").unwrap();
let dat_key = subm.value_of("dat_key").unwrap();
if dat_key.len() != 32*2 {
diff --git a/src/bin/geniza-register.rs b/src/bin/geniza-register.rs
index d0f48d8..a328bbc 100644
--- a/src/bin/geniza-register.rs
+++ b/src/bin/geniza-register.rs
@@ -2,6 +2,7 @@
#[macro_use]
extern crate error_chain;
extern crate clap;
+extern crate env_logger;
extern crate geniza;
// TODO: more careful import
@@ -10,6 +11,8 @@ use std::path::Path;
use clap::{App, SubCommand};
fn run() -> Result<()> {
+
+ env_logger::init().unwrap();
let matches = App::new("geniza-register")
.version(env!("CARGO_PKG_VERSION"))
diff --git a/src/bin/geniza-sleep.rs b/src/bin/geniza-sleep.rs
index f8921c7..10fc16b 100644
--- a/src/bin/geniza-sleep.rs
+++ b/src/bin/geniza-sleep.rs
@@ -3,6 +3,7 @@
extern crate error_chain;
#[macro_use]
extern crate clap;
+extern crate env_logger;
extern crate geniza;
// TODO: more careful import
@@ -11,6 +12,8 @@ use std::path::Path;
use clap::{App, SubCommand};
fn run() -> Result<()> {
+
+ env_logger::init().unwrap();
let matches = App::new("geniza-sleep")
.version(env!("CARGO_PKG_VERSION"))
diff --git a/src/sync.rs b/src/sync.rs
index 2869075..d3f54a5 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -2,6 +2,7 @@
use std::net::TcpStream;
use std::time::Duration;
use std::io::{Read, Write};
+use std::cmp;
use crypto::digest::Digest;
use crypto::blake2b::Blake2b;
use sodiumoxide::crypto::stream::*;
@@ -57,6 +58,80 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message {
}
}
+/// This helper is pretty slow/inefficient; lots of copying memory
+fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) {
+
+ let mut offset = byte_offset;
+
+ // We may have a partial-64-byte-block to finish encrypting first
+ let partial_offset: usize = (offset % 64) as usize;
+ let partial_len: usize = cmp::min(64 - partial_offset, buf.len());
+ if partial_len != 0 {
+ let mut partial = vec![0; 64];
+ for i in 0..partial_len {
+ partial[partial_offset+i] = buf[i];
+ }
+ let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key);
+ offset += partial_len as u64;
+ for i in 0..partial_len {
+ buf[i] = partial_enc[partial_offset+i];
+ }
+ }
+ if buf.len() > partial_len {
+ let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key);
+ //offset += main_enc.len() as u64;
+ for i in 0..main_enc.len() {
+ buf[partial_len+i] = main_enc[i];
+ }
+ }
+}
+
+#[test]
+fn test_bsxii_short() {
+
+ let nonce = gen_nonce();
+ let key = gen_key();
+
+ for size in [10, 100, 1234].iter() {
+ let mut a = vec![7; *size];
+ let mut b = vec![7; *size];
+ let c = vec![7; *size];
+
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut b, 0, &nonce, &key);
+ assert_eq!(a, b);
+ assert_ne!(a, c);
+ bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key);
+ assert_eq!(a, c);
+ }
+}
+
+#[test]
+fn test_bsxii_continued() {
+
+ let nonce = gen_nonce();
+ let key = gen_key();
+
+ let mut a = vec![7; 1234];
+ let mut b = vec![7; 1234];
+ let c = vec![7; 1234];
+
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a[0..10], 0, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut a[10..20], 10, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut b[0..20], 0, &nonce, &key);
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a[20..50], 20, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut a[50..500], 50, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut b[20..500], 20, &nonce, &key);
+ assert_ne!(a, c);
+ assert_eq!(a, b);
+ bytewise_stream_xor_ic_inplace(&mut a[500..1234], 500, &nonce, &key);
+ bytewise_stream_xor_ic_inplace(&mut a, 0, &nonce, &key);
+ assert_eq!(a, c);
+}
+
pub struct TcpSodiumReader<'a> {
dc: &'a mut DatConnection,
}
@@ -65,10 +140,9 @@ impl<'a> Read for TcpSodiumReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result<usize> {
let len = self.dc.tcp.read(buf)?;
- if len > 0 {
- stream_xor_inplace(&mut buf[0..len], &self.dc.rx_nonce, &self.dc.key);
- }
+ bytewise_stream_xor_ic_inplace(&mut buf[0..len], self.dc.rx_offset, &self.dc.rx_nonce, &self.dc.key);
self.dc.rx_offset += len as u64;
+
Ok(len)
}
}
@@ -80,9 +154,14 @@ pub struct TcpSodiumWriter<'a> {
impl<'a> Write for TcpSodiumWriter<'a> {
fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> {
- //let enc = stream_xor_ic(buf, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key);
- let enc = stream_xor(buf, &self.dc.tx_nonce, &self.dc.key);
- self.dc.tx_offset += buf.len() as u64;
+
+ // Don't mutate what we've been passed
+ let mut enc = vec![0; buf.len()];
+ enc.copy_from_slice(buf);
+
+ bytewise_stream_xor_ic_inplace(&mut enc, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key);
+ self.dc.tx_offset += enc.len() as u64;
+
self.dc.tcp.write(&enc)
}
@@ -117,7 +196,7 @@ impl DatConnection {
rng.fill_bytes(&mut local_id);
// Connect to server
- println!("Connecting to {}", host_port);
+ info!("Connecting to {}", host_port);
// TODO: timeout on connect (socketaddr dance)
let tcp = TcpStream::connect(host_port)?;
tcp.set_read_timeout(Some(timeout))?;
@@ -189,7 +268,8 @@ impl DatConnection {
let msg: &Message = msg_sugar(dnm);
let total_message_size = (msg.compute_size() as usize) + 1;
- println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content);
+ trace!("SEND total_len={} header={} is_content={} type={:?}",
+ total_message_size, header_int, is_content, &dnm);
// send both header varints, and data
tx_stream.write_varint(total_message_size as u64)?;
@@ -218,7 +298,7 @@ impl DatConnection {
let is_content = (header & (1 << 4)) != 0;
- println!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
+ trace!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
if header > 0x1F {
bail!("Invalid header received: {}", header);
@@ -241,15 +321,21 @@ impl DatConnection {
9 => DatNetMessage::Data(parse_from_bytes::<Data>(&mut buf)?),
other => bail!("Unimplemented message type received: {}", other),
};
+ trace!("\twas: {:?}", &dnm);
Ok((is_content, dnm))
}
+ /// Special unencrypted variant of `send_msg()`, used only during initial connection
+ /// establishment (eg, to check metadata discovery key and exchange nonces). After the
+ /// connection is initialized, send Register (aka Feed) messages as normal to add extra feeds.
fn send_register(&mut self, reg: &Feed) -> Result<()> {
+ // TODO: refactor this to take discovery key and nonce directly
let header_int: u8 = 0;
let total_message_size = (reg.compute_size() as usize) + 1;
- println!("SEND total_len={} header={}", total_message_size, header_int);
+ trace!("SEND total_len={} header={} msg={:?}",
+ total_message_size, header_int, reg);
self.tcp.write_varint(total_message_size as u64)?;
self.tcp.write_varint(header_int as u32)?;
@@ -257,7 +343,9 @@ impl DatConnection {
Ok(())
}
+ /// Receive complement to `send_register()`.
fn recv_register(&mut self) -> Result<Feed> {
+ // TODO: refactor this to return discovery key and nonce directly
let total_len: u64 = self.tcp.read_varint()?;
let header: u8 = self.tcp.read_varint()?;
@@ -266,13 +354,14 @@ impl DatConnection {
bail!("Invalid register header received");
}
- println!("RECV total_len={} header={}", total_len, header);
+ trace!("RECV total_len={} header={}", total_len, header);
let msg_len = (total_len - 1) as usize;
let mut buf = vec![0; msg_len];
self.tcp.read_exact(&mut buf[0..msg_len])?;
let reg = parse_from_bytes::<Feed>(&mut buf)?;
+ trace!("\twas: {:?}", reg);
Ok(reg)
}
@@ -284,6 +373,17 @@ impl DatConnection {
sm.set_downloading(true);
self.send_msg(&DatNetMessage::Status(sm), is_content)?;
+ // Have: nothing (so far)
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_length(0);
+ self.send_msg(&DatNetMessage::Have(hm), is_content)?;
+
+ // UnHave: still nothing
+ let mut uhm = Unhave::new();
+ uhm.set_start(0);
+ self.send_msg(&DatNetMessage::Unhave(uhm), is_content)?;
+
// Want: everything
let mut wm = Want::new();
wm.set_start(0);
@@ -300,7 +400,7 @@ impl DatConnection {
length = have.get_length();
break;
} else {
- info!("Expected Want message, got: {:?}", &msg);
+ info!("Expected Have message, got: {:?}", &msg);
continue;
}
};
@@ -316,7 +416,10 @@ impl DatConnection {
info!("Expected other message channel");
}
if let DatNetMessage::Data(dm) = msg {
- println!("Got metadata: {}", dm.get_index());
+ info!("Got metadata: {}", dm.get_index());
+ // TODO: if this is index 0, and we're receive-all, and this is metadata feed, then
+ // this is the special Index data entry; we should parse, check type, and store as
+ // register key (?)
} else {
info!("Expected Data message, got: {:?}", &msg);
continue;