aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2017-10-25 21:20:11 -0700
committerBryan Newbold <bnewbold@robocracy.org>2017-10-25 21:20:11 -0700
commitd475966f2bea3a4edc7f043c4c9c6617ee4e9bc8 (patch)
tree6d29d4be3eb01c1cbe8a9b3c7af24199c0667191
parentb33fc28ac95be2a7a93c37073694d576c7ac4870 (diff)
downloadgeniza-d475966f2bea3a4edc7f043c4c9c6617ee4e9bc8.tar.gz
geniza-d475966f2bea3a4edc7f043c4c9c6617ee4e9bc8.zip
encrypted network rx/tx (broken)
-rw-r--r--src/sync.rs164
1 files changed, 114 insertions, 50 deletions
diff --git a/src/sync.rs b/src/sync.rs
index 0e27503..2869075 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -1,10 +1,11 @@
use std::net::TcpStream;
use std::time::Duration;
-use std::io::Read;
+use std::io::{Read, Write};
use crypto::digest::Digest;
use crypto::blake2b::Blake2b;
-use rand::{Rng, OsRng};
+use sodiumoxide::crypto::stream::*;
+use rand::{OsRng, Rng};
use protobuf::Message;
use protobuf::parse_from_bytes;
use integer_encoding::{VarIntReader, VarIntWriter};
@@ -56,16 +57,53 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message {
}
}
+pub struct TcpSodiumReader<'a> {
+ dc: &'a mut DatConnection,
+}
+
+impl<'a> Read for TcpSodiumReader<'a> {
+
+ fn read(&mut self, buf: &mut [u8]) -> ::std::io::Result<usize> {
+ let len = self.dc.tcp.read(buf)?;
+ if len > 0 {
+ stream_xor_inplace(&mut buf[0..len], &self.dc.rx_nonce, &self.dc.key);
+ }
+ self.dc.rx_offset += len as u64;
+ Ok(len)
+ }
+}
+
+pub struct TcpSodiumWriter<'a> {
+ dc: &'a mut DatConnection,
+}
+
+impl<'a> Write for TcpSodiumWriter<'a> {
+
+ fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> {
+ //let enc = stream_xor_ic(buf, self.dc.tx_offset, &self.dc.tx_nonce, &self.dc.key);
+ let enc = stream_xor(buf, &self.dc.tx_nonce, &self.dc.key);
+ self.dc.tx_offset += buf.len() as u64;
+ self.dc.tcp.write(&enc)
+ }
+
+ fn flush(&mut self) -> ::std::io::Result<()> {
+ self.dc.tcp.flush()
+ }
+}
+
/// Represents a bi-directional connection to a network peer
///
/// Spec says nonce is 32 bytes, by dat implementation (hypercore-protocol) is 24 bytes.
pub struct DatConnection {
- nonce: [u8; 24],
- remote_nonce: [u8; 24],
id: [u8; 32],
remote_id: [u8; 32],
tcp: TcpStream,
live: bool,
+ key: Key,
+ tx_nonce: Nonce,
+ tx_offset: u64,
+ rx_nonce: Nonce,
+ rx_offset: u64,
}
impl DatConnection {
@@ -73,10 +111,9 @@ impl DatConnection {
pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> {
let timeout = Duration::new(7, 0);
- let mut rng = OsRng::new()?;
- let mut nonce = [0; 24];
- rng.fill_bytes(&mut nonce);
+ let tx_nonce = gen_nonce();
let mut local_id = [0; 32];
+ let mut rng = OsRng::new()?;
rng.fill_bytes(&mut local_id);
// Connect to server
@@ -87,12 +124,15 @@ impl DatConnection {
tcp.set_write_timeout(Some(timeout))?;
let mut dc = DatConnection {
- nonce: nonce,
- remote_nonce: [0; 24],
id: local_id,
tcp,
live,
- remote_id: [0; 32]
+ remote_id: [0; 32],
+ key: Key::from_slice(key).unwrap(), // TODO:
+ tx_nonce: tx_nonce,
+ tx_offset: 0,
+ rx_nonce: gen_nonce(), // dummy
+ rx_offset: 0,
};
// Exchange register/feed
@@ -105,32 +145,22 @@ impl DatConnection {
// send register
let mut register_msg = Feed::new();
register_msg.set_discoveryKey(discovery_key.to_vec());
- register_msg.set_nonce(nonce.to_vec());
- dc.send_msg(false, &DatNetMessage::Register(register_msg))?;
+ register_msg.set_nonce((tx_nonce[0..24]).to_vec());
+ dc.send_register(&register_msg)?;
// read register
- let (was_content, msg) = dc.recv_msg()?;
- if was_content {
- bail!("Expected metadata msg, not content");
- }
- if let DatNetMessage::Register(registration) = msg {
- if registration.get_discoveryKey()[0..32] != discovery_key[..] {
- bail!("Remote peer not sharing same discovery key");
- }
- // TODO: more satisfying way to do this copy
- let rn = registration.get_nonce();
- for i in 0..24 {
- dc.remote_nonce[i] = rn[i];
- }
- } else {
- bail!("Expected Registration message, got something else");
+ let registration = dc.recv_register()?;
+ if registration.get_discoveryKey()[0..32] != discovery_key[..] {
+ bail!("Remote peer not sharing same discovery key");
}
+ let rn = registration.get_nonce();
+ dc.rx_nonce = Nonce::from_slice(&rn).unwrap();
// send handshake
let mut handshake_msg = Handshake::new();
handshake_msg.set_live(live);
handshake_msg.set_id(local_id.to_vec());
- dc.send_msg(false, &DatNetMessage::Handshake(handshake_msg))?;
+ dc.send_msg(&DatNetMessage::Handshake(handshake_msg), false)?;
// read handshake
let (was_content, msg) = dc.recv_msg()?;
@@ -152,8 +182,9 @@ impl DatConnection {
Ok(dc)
}
- fn send_msg(&mut self, is_content: bool, dnm: &DatNetMessage) -> Result<()> {
+ fn send_msg(&mut self, dnm: &DatNetMessage, is_content: bool) -> Result<()> {
+ let mut tx_stream = TcpSodiumWriter { dc: self };
let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F);
let msg: &Message = msg_sugar(dnm);
let total_message_size = (msg.compute_size() as usize) + 1;
@@ -161,40 +192,41 @@ impl DatConnection {
println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content);
// send both header varints, and data
- self.tcp.write_varint(total_message_size as u64)?;
- self.tcp.write_varint(header_int as u32)?;
+ tx_stream.write_varint(total_message_size as u64)?;
+ tx_stream.write_varint(header_int as u32)?;
match dnm {
- &DatNetMessage::Register(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Handshake(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Status(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Have(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Unhave(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Want(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Unwant(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Request(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Cancel(ref m) => m.write_to_writer(&mut self.tcp)?,
- &DatNetMessage::Data(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Register(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Handshake(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Status(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Have(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Unhave(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Want(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Unwant(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Request(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Cancel(ref m) => m.write_to_writer(&mut tx_stream)?,
+ &DatNetMessage::Data(ref m) => m.write_to_writer(&mut tx_stream)?,
}
Ok(())
}
fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> {
- let total_len: u64 = self.tcp.read_varint()?;
- let header: u8 = self.tcp.read_varint()?;
+ let mut rx_stream = TcpSodiumReader { dc: self };
+ let total_len: u64 = rx_stream.read_varint()?;
+ let header: u8 = rx_stream.read_varint()?;
let is_content = (header & (1 << 4)) != 0;
+ println!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
+
if header > 0x1F {
- bail!("Invalid header received");
+ bail!("Invalid header received: {}", header);
}
- println!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
-
let msg_len = (total_len - 1) as usize;
let mut buf = vec![0; msg_len];
- self.tcp.read_exact(&mut buf[0..msg_len])?;
+ rx_stream.read_exact(&mut buf[0..msg_len])?;
let dnm = match header & 0x0F {
0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf)?),
@@ -212,18 +244,50 @@ impl DatConnection {
Ok((is_content, dnm))
}
+ fn send_register(&mut self, reg: &Feed) -> Result<()> {
+
+ let header_int: u8 = 0;
+ let total_message_size = (reg.compute_size() as usize) + 1;
+
+ println!("SEND total_len={} header={}", total_message_size, header_int);
+
+ self.tcp.write_varint(total_message_size as u64)?;
+ self.tcp.write_varint(header_int as u32)?;
+ reg.write_to_writer(&mut self.tcp)?;
+ Ok(())
+ }
+
+ fn recv_register(&mut self) -> Result<Feed> {
+
+ let total_len: u64 = self.tcp.read_varint()?;
+ let header: u8 = self.tcp.read_varint()?;
+
+ if header != 0 {
+ bail!("Invalid register header received");
+ }
+
+ println!("RECV total_len={} header={}", total_len, header);
+
+ let msg_len = (total_len - 1) as usize;
+ let mut buf = vec![0; msg_len];
+ self.tcp.read_exact(&mut buf[0..msg_len])?;
+
+ let reg = parse_from_bytes::<Feed>(&mut buf)?;
+ Ok(reg)
+ }
+
pub fn receive_all(&mut self, is_content: bool) -> Result<()> {
// Status: downloading, not uploading
let mut sm = Info::new();
sm.set_uploading(false);
sm.set_downloading(true);
- self.send_msg(is_content, &DatNetMessage::Status(sm))?;
+ self.send_msg(&DatNetMessage::Status(sm), is_content)?;
// Want: everything
let mut wm = Want::new();
wm.set_start(0);
- self.send_msg(is_content, &DatNetMessage::Want(wm))?;
+ self.send_msg(&DatNetMessage::Want(wm), is_content)?;
// listen for Have
let length;
@@ -245,7 +309,7 @@ impl DatConnection {
for i in 0..length {
let mut rm = Request::new();
rm.set_index(i);
- self.send_msg(is_content, &DatNetMessage::Request(rm))?;
+ self.send_msg(&DatNetMessage::Request(rm), is_content)?;
let (was_content, msg) = self.recv_msg()?;
if was_content != is_content{