diff options
Diffstat (limited to 'src/sync.rs')
-rw-r--r-- | src/sync.rs | 155 |
1 files changed, 115 insertions, 40 deletions
diff --git a/src/sync.rs b/src/sync.rs index f89f2bb..0e27503 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,21 +1,18 @@ -use std::io::Read; use std::net::TcpStream; -use integer_encoding::{VarIntReader, VarIntWriter, VarInt}; use std::time::Duration; +use std::io::Read; use crypto::digest::Digest; use crypto::blake2b::Blake2b; use rand::{Rng, OsRng}; use protobuf::Message; -use protobuf::core::parse_from_reader; -use protobuf::core::parse_from_bytes; +use protobuf::parse_from_bytes; +use integer_encoding::{VarIntReader, VarIntWriter}; use errors::*; use network_proto::*; -// TCP stream -// two varints - +#[derive(Debug)] pub enum DatNetMessage { Register(Feed), Handshake(Handshake), @@ -66,12 +63,14 @@ pub struct DatConnection { nonce: [u8; 24], remote_nonce: [u8; 24], id: [u8; 32], - stream: TcpStream, + remote_id: [u8; 32], + tcp: TcpStream, + live: bool, } impl DatConnection { - pub fn connect(host_port: &str, key: &[u8]) -> Result<DatConnection> { + pub fn connect(host_port: &str, key: &[u8], live: bool) -> Result<DatConnection> { let timeout = Duration::new(7, 0); let mut rng = OsRng::new()?; @@ -83,19 +82,21 @@ impl DatConnection { // Connect to server println!("Connecting to {}", host_port); // TODO: timeout on connect (socketaddr dance) - let stream = TcpStream::connect(host_port)?; - stream.set_read_timeout(Some(timeout))?; - stream.set_write_timeout(Some(timeout))?; + let tcp = TcpStream::connect(host_port)?; + tcp.set_read_timeout(Some(timeout))?; + tcp.set_write_timeout(Some(timeout))?; let mut dc = DatConnection { nonce: nonce, remote_nonce: [0; 24], id: local_id, - stream, + tcp, + live, + remote_id: [0; 32] }; // Exchange register/feed - dc.stream.set_nodelay(true)?; // Faster handshake + dc.tcp.set_nodelay(true)?; // Faster handshake // calculate discovery key let mut discovery_key = [0; 32]; let mut hash = Blake2b::new_keyed(32, key); @@ -116,7 +117,7 @@ impl DatConnection { if registration.get_discoveryKey()[0..32] != discovery_key[..] { bail!("Remote peer not sharing same discovery key"); } - // TODO: more satisfying way to do this transfer + // TODO: more satisfying way to do this copy let rn = registration.get_nonce(); for i in 0..24 { dc.remote_nonce[i] = rn[i]; @@ -126,9 +127,27 @@ impl DatConnection { } // send handshake + let mut handshake_msg = Handshake::new(); + handshake_msg.set_live(live); + handshake_msg.set_id(local_id.to_vec()); + dc.send_msg(false, &DatNetMessage::Handshake(handshake_msg))?; + // read handshake + let (was_content, msg) = dc.recv_msg()?; + if was_content { + bail!("Expected metadata msg, not content"); + } + if let DatNetMessage::Handshake(handshake) = msg { + // TODO: more satisfying way to do this copy + let hid = handshake.get_id(); + for i in 0..32 { + dc.remote_id[i] = hid[i]; + } + } else { + bail!("Expected Handshake message, got something else"); + } - dc.stream.set_nodelay(false)?; // Back to normal + dc.tcp.set_nodelay(false)?; // Back to normal Ok(dc) } @@ -137,53 +156,109 @@ impl DatConnection { let header_int: u8 = (is_content as u8) << 4 | (msg_code(dnm) & 0x0F); let msg: &Message = msg_sugar(dnm); - let total_message_size = (msg.compute_size() as usize) + header_int.required_space(); + let total_message_size = (msg.compute_size() as usize) + 1; println!("SEND total_len={} header={} is_content={}", total_message_size, header_int, is_content); // send both header varints, and data - self.stream.write_varint(total_message_size)?; - self.stream.write_varint(header_int)?; - msg.write_to_writer(&mut self.stream)?; + self.tcp.write_varint(total_message_size as u64)?; + self.tcp.write_varint(header_int as u32)?; + match dnm { + &DatNetMessage::Register(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Handshake(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Status(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Have(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Unhave(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Want(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Unwant(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Request(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Cancel(ref m) => m.write_to_writer(&mut self.tcp)?, + &DatNetMessage::Data(ref m) => m.write_to_writer(&mut self.tcp)?, + } Ok(()) } fn recv_msg(&mut self) -> Result<(bool, DatNetMessage)> { - let total_len: u64 = self.stream.read_varint()?; - let header: u8 = self.stream.read_varint()?; + + let total_len: u64 = self.tcp.read_varint()?; + let header: u8 = self.tcp.read_varint()?; let is_content = (header & (1 << 4)) != 0; + if header > 0x1F { + bail!("Invalid header received"); + } + println!("RECV total_len={} header={} is_content={}", total_len, header, is_content); - // XXX: replace with coded, buffered streams - let mut buf = [0; 1024]; let msg_len = (total_len - 1) as usize; - let len = self.stream.read(&mut buf[0..msg_len])?; - println!("raw read: {} first={}", len, buf[0]); + let mut buf = vec![0; msg_len]; + self.tcp.read_exact(&mut buf[0..msg_len])?; let dnm = match header & 0x0F { - 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf[0..msg_len])?), - 1 => DatNetMessage::Handshake(parse_from_reader::<Handshake>(&mut self.stream)?), - 2 => DatNetMessage::Status(parse_from_reader::<Info>(&mut self.stream)?), - 3 => DatNetMessage::Have(parse_from_reader::<Have>(&mut self.stream)?), - 4 => DatNetMessage::Unhave(parse_from_reader::<Unhave>(&mut self.stream)?), - 5 => DatNetMessage::Want(parse_from_reader::<Want>(&mut self.stream)?), - 6 => DatNetMessage::Unwant(parse_from_reader::<Unwant>(&mut self.stream)?), - 7 => DatNetMessage::Request(parse_from_reader::<Request>(&mut self.stream)?), - 8 => DatNetMessage::Cancel(parse_from_reader::<Cancel>(&mut self.stream)?), - 9 => DatNetMessage::Data(parse_from_reader::<Data>(&mut self.stream)?), - _ => bail!("Unimplemented message type received"), + 0 => DatNetMessage::Register(parse_from_bytes::<Feed>(&mut buf)?), + 1 => DatNetMessage::Handshake(parse_from_bytes::<Handshake>(&mut buf)?), + 2 => DatNetMessage::Status(parse_from_bytes::<Info>(&mut buf)?), + 3 => DatNetMessage::Have(parse_from_bytes::<Have>(&mut buf)?), + 4 => DatNetMessage::Unhave(parse_from_bytes::<Unhave>(&mut buf)?), + 5 => DatNetMessage::Want(parse_from_bytes::<Want>(&mut buf)?), + 6 => DatNetMessage::Unwant(parse_from_bytes::<Unwant>(&mut buf)?), + 7 => DatNetMessage::Request(parse_from_bytes::<Request>(&mut buf)?), + 8 => DatNetMessage::Cancel(parse_from_bytes::<Cancel>(&mut buf)?), + 9 => DatNetMessage::Data(parse_from_bytes::<Data>(&mut buf)?), + other => bail!("Unimplemented message type received: {}", other), }; Ok((is_content, dnm)) } - fn receive_all(&self) -> Result<()> { + pub fn receive_all(&mut self, is_content: bool) -> Result<()> { // Status: downloading, not uploading - // Have: nothing + let mut sm = Info::new(); + sm.set_uploading(false); + sm.set_downloading(true); + self.send_msg(is_content, &DatNetMessage::Status(sm))?; + // Want: everything - unimplemented!(); + let mut wm = Want::new(); + wm.set_start(0); + self.send_msg(is_content, &DatNetMessage::Want(wm))?; + + // listen for Have + let length; + loop { + let (was_content, msg) = self.recv_msg()?; + if was_content != is_content{ + continue; + } + if let DatNetMessage::Have(have) = msg { + length = have.get_length(); + break; + } else { + info!("Expected Want message, got: {:?}", &msg); + continue; + } + }; + + // Request / Data loop + for i in 0..length { + let mut rm = Request::new(); + rm.set_index(i); + self.send_msg(is_content, &DatNetMessage::Request(rm))?; + + let (was_content, msg) = self.recv_msg()?; + if was_content != is_content{ + info!("Expected other message channel"); + } + if let DatNetMessage::Data(dm) = msg { + println!("Got metadata: {}", dm.get_index()); + } else { + info!("Expected Data message, got: {:?}", &msg); + continue; + } + } + + Ok(()) } } |