From a699c92cdceb0382d9bfec37724166145a7eb6df Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 28 Nov 2017 23:34:35 -0800 Subject: WIP on node (synchronization) code --- src/bin/geniza-net.rs | 24 ++++++++ src/lib.rs | 5 +- src/node.rs | 157 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 src/node.rs (limited to 'src') diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs index 38dc507..dd80c4f 100644 --- a/src/bin/geniza-net.rs +++ b/src/bin/geniza-net.rs @@ -55,6 +55,19 @@ fn run() -> Result<()> { .about("Prints the DNS name to query (mDNS or centrally) for peers") .arg_from_usage(" 'dat key (public key) to convert (in hex)'"), ) + .subcommand( + SubCommand::with_name("naive-clone") + .about("Pulls a drive from a single (known) peer, using a naive algorithm") + .arg(Arg::with_name("dat-dir") + .short("d") + .long("dat-dir") + .value_name("PATH") + .help("dat drive directory") + .default_value(".dat") + .takes_value(true)) + .arg_from_usage(" 'peer host:port to connect to'") + .arg_from_usage(" 'dat key (public key) to pull"), + ) .get_matches(); @@ -113,6 +126,17 @@ fn run() -> Result<()> { } println!(".dat.local"); } + ("naive-clone", Some(subm)) => { + let host_port = subm.value_of("host_port").unwrap(); + let dat_key = subm.value_of("dat_key").unwrap(); + let key_bytes = parse_dat_key(&dat_key)?; + let dir = Path::new(subm.value_of("dat-dir").unwrap()); + let mut metadata = SleepDirRegister::create(&dir, "metadata")?; + node_simple_clone(host_port, &key_bytes, &mut metadata, false)?; + // TODO: read out content key from metadata register + //let content = SleepDirRegister::create(&dir, "content")?; + //node_simple_clone(host_port, &key_bytes, &mut content, true)?; + } _ => { println!("Missing or unimplemented command!"); println!("{}", matches.usage()); diff --git a/src/lib.rs b/src/lib.rs index f0059e6..10139d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ extern crate log; extern crate protobuf; extern crate rand; extern crate sodiumoxide; +extern crate bit_field; #[cfg(test)] extern crate tempdir; @@ -52,8 +53,10 @@ mod protocol; pub use protocol::*; pub mod network_msgs; pub mod metadata_msgs; +mod node; +pub use node::*; - +// Shared functions use crypto::digest::Digest; use crypto::blake2b::Blake2b; diff --git a/src/node.rs b/src/node.rs new file mode 100644 index 0000000..582bf3d --- /dev/null +++ b/src/node.rs @@ -0,0 +1,157 @@ + +use errors::*; +use network_msgs::*; +use protocol::{DatNetMessage, DatConnection}; +use integer_encoding::VarInt; +use bit_field::BitArray; +use sleep_register::HyperRegister; + +fn decode_bitfiled(raw_bf: &[u8]) -> Result> { + let mut offset = 0; // byte offset that we have read up to + if raw_bf.len() < 1 { + bail!("Expected (varint-encoded) bitfield to have len>=1"); + } + let mut bit_array: Vec = vec![]; + while offset < raw_bf.len() { + let (header, inc): (u64, usize) = VarInt::decode_var(&raw_bf[offset..]); + offset += inc; + + if (header & 0x01) == 0x01 { + // compressed + let bit = (header & 0x02) == 0x02; + let run_len = header >> 2; + if bit { + bit_array.append(&mut vec![0xFF; run_len as usize]); + } else { + bit_array.append(&mut vec![0x00; run_len as usize]); + } + } else { + // uncompressed + let byte_count = header >> 1; + let mut data = raw_bf[offset..(offset + byte_count as usize)].to_vec(); + bit_array.append(&mut data); + offset += byte_count as usize; + } + } + // XXX: HACK + bit_array.reverse(); + return Ok(bit_array); +} + +/// Finds the index of the lowest bit +fn max_high_bit(bf: &[u8]) -> u64 { + // XXX: HACK, going backwards + for i in 0..bf.bit_length() { + if bf.get_bit(i) { + return (bf.bit_length() - i - 1) as u64; + } + } + return 0; +} + +fn max_index(have_msg: &Have) -> Result { + if have_msg.has_length() { + return Ok(have_msg.get_start() + have_msg.get_length()); + } else if have_msg.has_bitfield() { + let raw_bf = have_msg.get_bitfield(); + let bf = decode_bitfiled(raw_bf)?; + trace!("decoded bitfield: {:?}", bf); + return Ok(max_high_bit(&bf)); + } else { + return Ok(have_msg.get_start() + 1); + } +} + +#[test] +fn test_max_index() { + let mut hm = Have::new(); + hm.set_start(0); + hm.set_bitfield(vec![7,2,128]); + assert_eq!(max_index(&hm).unwrap(), 8); + + hm.set_bitfield(vec![2, 207]); + assert_eq!(max_index(&hm).unwrap(), 7); + + // Alphabet test dat + hm.set_bitfield(vec![2, 254]); + assert_eq!(max_index(&hm).unwrap(), 6); + hm.set_bitfield(vec![2, 252]); + assert_eq!(max_index(&hm).unwrap(), 5); +} + +/// Tries to connect to a single peer, pull register, and close. +pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, is_content: bool) -> Result<()> { + + if register.len()? > 0 { + bail!("Register already had data in it (expected empty for naive clone)"); + } + + let mut dc = DatConnection::connect(host_port, key, false)?; + + // Info: downloading, not uploading + let mut im = Info::new(); + im.set_uploading(false); + im.set_downloading(true); + let im = DatNetMessage::Info(im); + dc.send_msg(&im, is_content)?; + + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + let hm = DatNetMessage::Have(hm); + dc.send_msg(&hm, is_content)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + let uhm = DatNetMessage::Unhave(uhm); + dc.send_msg(&uhm, is_content)?; + + // Want: everything + let mut wm = Want::new(); + wm.set_start(0); + let wm = DatNetMessage::Want(wm); + dc.send_msg(&wm, is_content)?; + + let last_entry: u64; + + // Receive Have messages to determine lengths + loop { + let (was_content, msg) = dc.recv_msg()?; + match msg { + DatNetMessage::Have(dh) => { + info!("is_content={}; {:?}; bitfield={:?}", was_content, dh, dh.get_bitfield()); + last_entry = max_index(&dh)?; + break; + }, + _ => { + info!("Other message: {:?}", &msg); + } + } + } + + info!("last_entry={}", last_entry); + + // Request / Data loops + for i in 0..(last_entry+1) { + let mut rm = Request::new(); + rm.set_index(i); + info!("Sending request: {:?}", rm); + dc.send_msg(&DatNetMessage::Request(rm), false)?; + let (was_content, msg) = dc.recv_msg()?; + assert!(!was_content); + match msg { + DatNetMessage::Data(dm) => { + info!("Got data: index={}", dm.get_index()); + assert!(dm.has_value()); + assert!(dm.get_index() == i); + register.append(dm.get_value())?; + }, + _ => { + info!("Other message: {:?}", &msg); + } + } + } + Ok(()) +} -- cgit v1.2.3