From b82f57d1ef9bc1cf8f55753df94f75e83d16a75b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 11 Dec 2017 21:33:59 -0800 Subject: refactor higher level peer APIs (WIP) --- src/bitfield.rs | 55 ++++++++++++++++++ src/helpers.rs | 61 ++++++++++++++++++++ src/lib.rs | 70 +++-------------------- src/node.rs | 157 ---------------------------------------------------- src/peer.rs | 12 ++++ src/synchronizer.rs | 126 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 262 insertions(+), 219 deletions(-) create mode 100644 src/bitfield.rs create mode 100644 src/helpers.rs delete mode 100644 src/node.rs create mode 100644 src/peer.rs create mode 100644 src/synchronizer.rs (limited to 'src') diff --git a/src/bitfield.rs b/src/bitfield.rs new file mode 100644 index 0000000..11640ea --- /dev/null +++ b/src/bitfield.rs @@ -0,0 +1,55 @@ + +use errors::*; +use integer_encoding::VarInt; +use bit_field::BitArray; + + +// WrappedBitfield +// +// uses vec of u64 internally? +// +// fn from_message() +// fn get(u64) + +pub fn decode_bitfield(raw_bf: &[u8]) -> Result> { + let mut offset = 0; // byte offset that we have read up to + if raw_bf.len() < 1 { + bail!("Expected (varint-encoded) bitfield to have len>=1"); + } + let mut bit_array: Vec = vec![]; + while offset < raw_bf.len() { + let (header, inc): (u64, usize) = VarInt::decode_var(&raw_bf[offset..]); + offset += inc; + + if (header & 0x01) == 0x01 { + // compressed + let bit = (header & 0x02) == 0x02; + let run_len = header >> 2; + if bit { + bit_array.append(&mut vec![0xFF; run_len as usize]); + } else { + bit_array.append(&mut vec![0x00; run_len as usize]); + } + } else { + // uncompressed + let byte_count = header >> 1; + let mut data = raw_bf[offset..(offset + byte_count as usize)].to_vec(); + bit_array.append(&mut data); + offset += byte_count as usize; + } + } + // XXX: HACK + bit_array.reverse(); + return Ok(bit_array); +} + +/// Finds the index of the lowest bit +pub fn max_high_bit(bf: &[u8]) -> u64 { + // XXX: HACK, going backwards + for i in 0..bf.bit_length() { + if bf.get_bit(i) { + return (bf.bit_length() - i - 1) as u64; + } + } + return 0; +} diff --git a/src/helpers.rs b/src/helpers.rs new file mode 100644 index 0000000..32f0e25 --- /dev/null +++ b/src/helpers.rs @@ -0,0 +1,61 @@ + +use errors::*; +use crypto::digest::Digest; +use crypto::blake2b::Blake2b; + +/// Helper to calculate a discovery key from a public key. 'key' should be 32 bytes; the returned +/// array will also be 32 bytes long. +/// +/// dat discovery keys are calculated as a BLAKE2b "keyed hash" (using the passed key) of the string +/// "hypercore" (with no trailing null byte). +pub fn make_discovery_key(key: &[u8]) -> Vec { + let mut discovery_key = [0; 32]; + let mut hash = Blake2b::new_keyed(32, key); + hash.input(&"hypercore".as_bytes()); + hash.result(&mut discovery_key); + discovery_key.to_vec() +} + +/// Helper to parse a dat address (aka, public key) in string format. +/// +/// Address can start with 'dat://'. It should contain 64 hexadecimal characters. +pub fn parse_dat_address(input: &str) -> Result> { + + let raw_key = if input.starts_with("dat://") { + &input[6..] + } else { + input + }; + if raw_key.len() != 32 * 2 { + bail!("dat key not correct length"); + } + let mut key_bytes = vec![]; + for i in 0..32 { + let r = u8::from_str_radix(&raw_key[2 * i..2 * i + 2], 16); + match r { + Ok(b) => key_bytes.push(b), + Err(e) => bail!("Problem with hex: {}", e), + }; + } + Ok(key_bytes) +} + +#[test] +fn test_parse_dat_address() { + + assert!(parse_dat_address( + "c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d597").is_ok()); + assert!(parse_dat_address( + "C7638882870ABD4044D6467B0738F15E3A36F57C3A7F7F3417FD7E4E0841D597").is_ok()); + assert!(parse_dat_address( + "dat://c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d597").is_ok()); + + assert!(parse_dat_address( + "c7638882870ab").is_err()); + assert!(parse_dat_address( + "g7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d597").is_err()); + assert!(parse_dat_address( + "dat://c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d5970").is_err()); + assert!(parse_dat_address( + "dat://c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d59").is_err()); +} diff --git a/src/lib.rs b/src/lib.rs index 63000f7..e3ad197 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,10 @@ mod errors { pub use errors::*; // Organize code internally (files, modules), but pull it all into a flat namespace to export. +mod helpers; +pub use helpers::*; +mod bitfield; +pub use bitfield::*; mod sleep_file; pub use sleep_file::*; mod sleep_register; @@ -56,68 +60,10 @@ mod protocol; pub use protocol::*; pub mod network_msgs; pub mod metadata_msgs; -mod node; -pub use node::*; mod discovery; pub use discovery::*; +mod peer; +pub use peer::*; +mod synchronizer; +pub use synchronizer::*; -// Shared functions -use crypto::digest::Digest; -use crypto::blake2b::Blake2b; - -/// Helper to calculate a discovery key from a public key. 'key' should be 32 bytes; the returned -/// array will also be 32 bytes long. -/// -/// dat discovery keys are calculated as a BLAKE2b "keyed hash" (using the passed key) of the string -/// "hypercore" (with no trailing null byte). -pub fn make_discovery_key(key: &[u8]) -> Vec { - let mut discovery_key = [0; 32]; - let mut hash = Blake2b::new_keyed(32, key); - hash.input(&"hypercore".as_bytes()); - hash.result(&mut discovery_key); - discovery_key.to_vec() -} - -/// Helper to parse a dat address (aka, public key) in string format. -/// -/// Address can start with 'dat://'. It should contain 64 hexadecimal characters. -pub fn parse_dat_address(input: &str) -> Result> { - - let raw_key = if input.starts_with("dat://") { - &input[6..] - } else { - input - }; - if raw_key.len() != 32 * 2 { - bail!("dat key not correct length"); - } - let mut key_bytes = vec![]; - for i in 0..32 { - let r = u8::from_str_radix(&raw_key[2 * i..2 * i + 2], 16); - match r { - Ok(b) => key_bytes.push(b), - Err(e) => bail!("Problem with hex: {}", e), - }; - } - Ok(key_bytes) -} - -#[test] -fn test_parse_dat_address() { - - assert!(parse_dat_address( - "c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d597").is_ok()); - assert!(parse_dat_address( - "C7638882870ABD4044D6467B0738F15E3A36F57C3A7F7F3417FD7E4E0841D597").is_ok()); - assert!(parse_dat_address( - "dat://c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d597").is_ok()); - - assert!(parse_dat_address( - "c7638882870ab").is_err()); - assert!(parse_dat_address( - "g7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d597").is_err()); - assert!(parse_dat_address( - "dat://c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d5970").is_err()); - assert!(parse_dat_address( - "dat://c7638882870abd4044d6467b0738f15e3a36f57c3a7f7f3417fd7e4e0841d59").is_err()); -} diff --git a/src/node.rs b/src/node.rs deleted file mode 100644 index 582bf3d..0000000 --- a/src/node.rs +++ /dev/null @@ -1,157 +0,0 @@ - -use errors::*; -use network_msgs::*; -use protocol::{DatNetMessage, DatConnection}; -use integer_encoding::VarInt; -use bit_field::BitArray; -use sleep_register::HyperRegister; - -fn decode_bitfiled(raw_bf: &[u8]) -> Result> { - let mut offset = 0; // byte offset that we have read up to - if raw_bf.len() < 1 { - bail!("Expected (varint-encoded) bitfield to have len>=1"); - } - let mut bit_array: Vec = vec![]; - while offset < raw_bf.len() { - let (header, inc): (u64, usize) = VarInt::decode_var(&raw_bf[offset..]); - offset += inc; - - if (header & 0x01) == 0x01 { - // compressed - let bit = (header & 0x02) == 0x02; - let run_len = header >> 2; - if bit { - bit_array.append(&mut vec![0xFF; run_len as usize]); - } else { - bit_array.append(&mut vec![0x00; run_len as usize]); - } - } else { - // uncompressed - let byte_count = header >> 1; - let mut data = raw_bf[offset..(offset + byte_count as usize)].to_vec(); - bit_array.append(&mut data); - offset += byte_count as usize; - } - } - // XXX: HACK - bit_array.reverse(); - return Ok(bit_array); -} - -/// Finds the index of the lowest bit -fn max_high_bit(bf: &[u8]) -> u64 { - // XXX: HACK, going backwards - for i in 0..bf.bit_length() { - if bf.get_bit(i) { - return (bf.bit_length() - i - 1) as u64; - } - } - return 0; -} - -fn max_index(have_msg: &Have) -> Result { - if have_msg.has_length() { - return Ok(have_msg.get_start() + have_msg.get_length()); - } else if have_msg.has_bitfield() { - let raw_bf = have_msg.get_bitfield(); - let bf = decode_bitfiled(raw_bf)?; - trace!("decoded bitfield: {:?}", bf); - return Ok(max_high_bit(&bf)); - } else { - return Ok(have_msg.get_start() + 1); - } -} - -#[test] -fn test_max_index() { - let mut hm = Have::new(); - hm.set_start(0); - hm.set_bitfield(vec![7,2,128]); - assert_eq!(max_index(&hm).unwrap(), 8); - - hm.set_bitfield(vec![2, 207]); - assert_eq!(max_index(&hm).unwrap(), 7); - - // Alphabet test dat - hm.set_bitfield(vec![2, 254]); - assert_eq!(max_index(&hm).unwrap(), 6); - hm.set_bitfield(vec![2, 252]); - assert_eq!(max_index(&hm).unwrap(), 5); -} - -/// Tries to connect to a single peer, pull register, and close. -pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, is_content: bool) -> Result<()> { - - if register.len()? > 0 { - bail!("Register already had data in it (expected empty for naive clone)"); - } - - let mut dc = DatConnection::connect(host_port, key, false)?; - - // Info: downloading, not uploading - let mut im = Info::new(); - im.set_uploading(false); - im.set_downloading(true); - let im = DatNetMessage::Info(im); - dc.send_msg(&im, is_content)?; - - // Have: nothing (so far) - let mut hm = Have::new(); - hm.set_start(0); - hm.set_length(0); - let hm = DatNetMessage::Have(hm); - dc.send_msg(&hm, is_content)?; - - // UnHave: still nothing - let mut uhm = Unhave::new(); - uhm.set_start(0); - let uhm = DatNetMessage::Unhave(uhm); - dc.send_msg(&uhm, is_content)?; - - // Want: everything - let mut wm = Want::new(); - wm.set_start(0); - let wm = DatNetMessage::Want(wm); - dc.send_msg(&wm, is_content)?; - - let last_entry: u64; - - // Receive Have messages to determine lengths - loop { - let (was_content, msg) = dc.recv_msg()?; - match msg { - DatNetMessage::Have(dh) => { - info!("is_content={}; {:?}; bitfield={:?}", was_content, dh, dh.get_bitfield()); - last_entry = max_index(&dh)?; - break; - }, - _ => { - info!("Other message: {:?}", &msg); - } - } - } - - info!("last_entry={}", last_entry); - - // Request / Data loops - for i in 0..(last_entry+1) { - let mut rm = Request::new(); - rm.set_index(i); - info!("Sending request: {:?}", rm); - dc.send_msg(&DatNetMessage::Request(rm), false)?; - let (was_content, msg) = dc.recv_msg()?; - assert!(!was_content); - match msg { - DatNetMessage::Data(dm) => { - info!("Got data: index={}", dm.get_index()); - assert!(dm.has_value()); - assert!(dm.get_index() == i); - register.append(dm.get_value())?; - }, - _ => { - info!("Other message: {:?}", &msg); - } - } - } - Ok(()) -} diff --git a/src/peer.rs b/src/peer.rs new file mode 100644 index 0000000..5535b09 --- /dev/null +++ b/src/peer.rs @@ -0,0 +1,12 @@ + +//use errors::*; + +// DatPeer +// connection (thread?) +// last_sent_ts +// last_received_ts +// has_messages +// registers: vec? map? +// +// fn has(reg, u64) +// fn has_intersection(reg, bitfield) diff --git a/src/synchronizer.rs b/src/synchronizer.rs new file mode 100644 index 0000000..216df6e --- /dev/null +++ b/src/synchronizer.rs @@ -0,0 +1,126 @@ + +use errors::*; +use network_msgs::*; +use bitfield::*; +use protocol::{DatNetMessage, DatConnection}; +use sleep_register::HyperRegister; + +// Synchronizer +// register_keys +// peers: vec +// registers: HyperRegisters +// mode: enum +// state: enum +// wanted: bitfield +// requested: vec +// +// fn next_wanted() -> Option((reg, u64)) +// fn tick() + + +fn max_index(have_msg: &Have) -> Result { + if have_msg.has_length() { + return Ok(have_msg.get_start() + have_msg.get_length()); + } else if have_msg.has_bitfield() { + let raw_bf = have_msg.get_bitfield(); + let bf = decode_bitfield(raw_bf)?; + trace!("decoded bitfield: {:?}", bf); + return Ok(max_high_bit(&bf)); + } else { + return Ok(have_msg.get_start() + 1); + } +} + +#[test] +fn test_max_index() { + let mut hm = Have::new(); + hm.set_start(0); + hm.set_bitfield(vec![7,2,128]); + assert_eq!(max_index(&hm).unwrap(), 8); + + hm.set_bitfield(vec![2, 207]); + assert_eq!(max_index(&hm).unwrap(), 7); + + // Alphabet test dat + hm.set_bitfield(vec![2, 254]); + assert_eq!(max_index(&hm).unwrap(), 6); + hm.set_bitfield(vec![2, 252]); + assert_eq!(max_index(&hm).unwrap(), 5); +} + +/// Tries to connect to a single peer, pull register, and close. +pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, is_content: bool) -> Result<()> { + + if register.len()? > 0 { + bail!("Register already had data in it (expected empty for naive clone)"); + } + + let mut dc = DatConnection::connect(host_port, key, false)?; + + // Info: downloading, not uploading + let mut im = Info::new(); + im.set_uploading(false); + im.set_downloading(true); + let im = DatNetMessage::Info(im); + dc.send_msg(&im, is_content)?; + + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + let hm = DatNetMessage::Have(hm); + dc.send_msg(&hm, is_content)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + let uhm = DatNetMessage::Unhave(uhm); + dc.send_msg(&uhm, is_content)?; + + // Want: everything + let mut wm = Want::new(); + wm.set_start(0); + let wm = DatNetMessage::Want(wm); + dc.send_msg(&wm, is_content)?; + + let last_entry: u64; + + // Receive Have messages to determine lengths + loop { + let (was_content, msg) = dc.recv_msg()?; + match msg { + DatNetMessage::Have(dh) => { + info!("is_content={}; {:?}; bitfield={:?}", was_content, dh, dh.get_bitfield()); + last_entry = max_index(&dh)?; + break; + }, + _ => { + info!("Other message: {:?}", &msg); + } + } + } + + info!("last_entry={}", last_entry); + + // Request / Data loops + for i in 0..(last_entry+1) { + let mut rm = Request::new(); + rm.set_index(i); + info!("Sending request: {:?}", rm); + dc.send_msg(&DatNetMessage::Request(rm), false)?; + let (was_content, msg) = dc.recv_msg()?; + assert!(!was_content); + match msg { + DatNetMessage::Data(dm) => { + info!("Got data: index={}", dm.get_index()); + assert!(dm.has_value()); + assert!(dm.get_index() == i); + register.append(dm.get_value())?; + }, + _ => { + info!("Other message: {:?}", &msg); + } + } + } + Ok(()) +} -- cgit v1.2.3