aboutsummaryrefslogtreecommitdiffstats
path: root/src/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync.rs')
-rw-r--r--src/sync.rs155
1 files changed, 115 insertions, 40 deletions
diff --git a/src/sync.rs b/src/sync.rs
index f89f2bb..0e27503 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -1,21 +1,18 @@
-use std::io::Read;
use std::net::TcpStream;
-use integer_encoding::{VarIntReader, VarIntWriter, VarInt};
use std::time::Duration;
+use std::io::Read;
use crypto::digest::Digest;
use crypto::blake2b::Blake2b;
use rand::{Rng, OsRng};
use protobuf::Message;
-use protobuf::core::parse_from_reader;
-use protobuf::core::parse_from_bytes;
+use protobuf::parse_from_bytes;
+use integer_encoding::{VarIntReader, VarIntWriter};
use errors::*;
use network_proto::*;
-// TCP stream
-// two varints
-
+#[derive(Debug)]
pub enum DatNetMessage {
Register(Feed),
Handshake(Handshake),
@@ -66,12 +63,14 @@ pub struct DatConnection {
nonce: [u8; 24],
remote_nonce: [u8; 24],
id: [u8; 32],
- stream: TcpStream,
+ remote_id: [u8; 32],
+ tcp: TcpStream,
+ live: bool,
}
impl DatConnection {
- pub fn connect(host_port: &str, key: &[u8]) -> Result<DatConnection> {
+ pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> {
let timeout = Duration::new(7, 0);
let mut rng = OsRng::new()?;
@@ -83,19 +82,21 @@ impl DatConnection {
// Connect to server
println!("Connecting to {}", host_port);
// TODO: timeout on connect (socketaddr dance)
- let stream = TcpStream::connect(host_port)?;
- stream.set_read_timeout(Some(timeout))?;
- stream.set_write_timeout(Some(timeout))?;
+ let tcp = TcpStream::connect(host_port)?;
+ tcp.set_read_timeout(Some(timeout))?;
+ tcp.set_write_timeout(Some(timeout))?;
let mut dc = DatConnection {
nonce: nonce,
remote_nonce: [0; 24],
id: local_id,
- stream,
+ tcp,
+ live,
+ remote_id: [0; 32]
};
// Exchange register/feed
- dc.stream.set_nodelay(true)?; // Faster handshake
+ dc.tcp.set_nodelay(true)?; // Faster handshake
// calculate discovery key
let mut discovery_key = [0; 32];
let mut hash = Blake2b::new_keyed(32, key);
@@ -116,7 +117,7 @@ impl DatConnection {
if registration.get_discoveryKey()[0..32] != discovery_key[..] {
bail!("Remote peer not sharing same discovery key");
}
- // TODO: more satisfying way to do this transfer
+ // TODO: more satisfying way to do this copy
let rn = registration.get_nonce();
for i in 0..24 {
dc.remote_nonce[i] = rn[i];
@@ -126,9 +127,27 @@ impl DatConnection {
}
// send handshake
+ let mut handshake_msg = Handshake::new();
+ handshake_msg.set_live(live);
+ handshake_msg.set_id(local_id.to_vec());
+ dc.send_msg(false, &DatNetMessage::Handshake(handshake_msg))?;
+
// read handshake
+ let (was_content, msg) = dc.recv_msg()?;
+ if was_content {
+ bail!("Expected metadata msg, not content");
+ }
+ if let DatNetMessage::Handshake(handshake) = msg {
+ // TODO: more satisfying way to do this copy
+ let hid = handshake.get_id();
+ for i in 0..32 {
+ dc.remote_id[i] = hid[i];
+ }
+ } else {
+ bail!("Expected Handshake message, got something else");
+ }
- dc.stream.set_nodelay(false)?; // Back to normal
+ dc.tcp.set_nodelay(false)?; // Back to normal
Ok(dc)
}
@@ -137,53 +156,109 @@ impl DatConnection {
let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F);
let msg: &Message = msg_sugar(dnm);
- let total_message_size = (msg.compute_size() as usize) + header_int.required_space();
+ let total_message_size = (msg.compute_size() as usize) + 1;
println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content);
// send both header varints, and data
- self.stream.write_varint(total_message_size)?;
- self.stream.write_varint(header_int)?;
- msg.write_to_writer(&mut self.stream)?;
+ self.tcp.write_varint(total_message_size as u64)?;
+ self.tcp.write_varint(header_int as u32)?;
+ match dnm {
+ &DatNetMessage::Register(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Handshake(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Status(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Have(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Unhave(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Want(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Unwant(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Request(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Cancel(ref m) => m.write_to_writer(&mut self.tcp)?,
+ &DatNetMessage::Data(ref m) => m.write_to_writer(&mut self.tcp)?,
+ }
Ok(())
}
fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> {
- let total_len: u64 = self.stream.read_varint()?;
- let header: u8 = self.stream.read_varint()?;
+
+ let total_len: u64 = self.tcp.read_varint()?;
+ let header: u8 = self.tcp.read_varint()?;
let is_content = (header & (1 << 4)) != 0;
+ if header > 0x1F {
+ bail!("Invalid header received");
+ }
+
println!("RECV total_len={} header={} is_content={}", total_len, header, is_content);
- // XXX: replace with coded, buffered streams
- let mut buf = [0; 1024];
let msg_len = (total_len - 1) as usize;
- let len = self.stream.read(&mut buf[0..msg_len])?;
- println!("raw read: {} first={}", len, buf[0]);
+ let mut buf = vec![0; msg_len];
+ self.tcp.read_exact(&mut buf[0..msg_len])?;
let dnm = match header & 0x0F {
- 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf[0..msg_len])?),
- 1 => DatNetMessage::Handshake(parse_from_reader::<Handshake>(&mut self.stream)?),
- 2 => DatNetMessage::Status(parse_from_reader::<Info>(&mut self.stream)?),
- 3 => DatNetMessage::Have(parse_from_reader::<Have>(&mut self.stream)?),
- 4 => DatNetMessage::Unhave(parse_from_reader::<Unhave>(&mut self.stream)?),
- 5 => DatNetMessage::Want(parse_from_reader::<Want>(&mut self.stream)?),
- 6 => DatNetMessage::Unwant(parse_from_reader::<Unwant>(&mut self.stream)?),
- 7 => DatNetMessage::Request(parse_from_reader::<Request>(&mut self.stream)?),
- 8 => DatNetMessage::Cancel(parse_from_reader::<Cancel>(&mut self.stream)?),
- 9 => DatNetMessage::Data(parse_from_reader::<Data>(&mut self.stream)?),
- _ => bail!("Unimplemented message type received"),
+ 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf)?),
+ 1 => DatNetMessage::Handshake(parse_from_bytes::<Handshake>(&mut buf)?),
+ 2 => DatNetMessage::Status(parse_from_bytes::<Info>(&mut buf)?),
+ 3 => DatNetMessage::Have(parse_from_bytes::<Have>(&mut buf)?),
+ 4 => DatNetMessage::Unhave(parse_from_bytes::<Unhave>(&mut buf)?),
+ 5 => DatNetMessage::Want(parse_from_bytes::<Want>(&mut buf)?),
+ 6 => DatNetMessage::Unwant(parse_from_bytes::<Unwant>(&mut buf)?),
+ 7 => DatNetMessage::Request(parse_from_bytes::<Request>(&mut buf)?),
+ 8 => DatNetMessage::Cancel(parse_from_bytes::<Cancel>(&mut buf)?),
+ 9 => DatNetMessage::Data(parse_from_bytes::<Data>(&mut buf)?),
+ other => bail!("Unimplemented message type received: {}", other),
};
Ok((is_content, dnm))
}
- fn receive_all(&self) -> Result<()> {
+ pub fn receive_all(&mut self, is_content: bool) -> Result<()> {
// Status: downloading, not uploading
- // Have: nothing
+ let mut sm = Info::new();
+ sm.set_uploading(false);
+ sm.set_downloading(true);
+ self.send_msg(is_content, &DatNetMessage::Status(sm))?;
+
// Want: everything
- unimplemented!();
+ let mut wm = Want::new();
+ wm.set_start(0);
+ self.send_msg(is_content, &DatNetMessage::Want(wm))?;
+
+ // listen for Have
+ let length;
+ loop {
+ let (was_content, msg) = self.recv_msg()?;
+ if was_content != is_content{
+ continue;
+ }
+ if let DatNetMessage::Have(have) = msg {
+ length = have.get_length();
+ break;
+ } else {
+ info!("Expected Want message, got: {:?}", &msg);
+ continue;
+ }
+ };
+
+ // Request / Data loop
+ for i in 0..length {
+ let mut rm = Request::new();
+ rm.set_index(i);
+ self.send_msg(is_content, &DatNetMessage::Request(rm))?;
+
+ let (was_content, msg) = self.recv_msg()?;
+ if was_content != is_content{
+ info!("Expected other message channel");
+ }
+ if let DatNetMessage::Data(dm) = msg {
+ println!("Got metadata: {}", dm.get_index());
+ } else {
+ info!("Expected Data message, got: {:?}", &msg);
+ continue;
+ }
+ }
+
+ Ok(())
}
}