aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2017-11-28 23:34:35 -0800
committerBryan Newbold <bnewbold@robocracy.org>2017-11-28 23:34:35 -0800
commita699c92cdceb0382d9bfec37724166145a7eb6df (patch)
treed8a83b58cd8a59e912d1e0eafffd05dccdc11554
parent3c2fbe17cc3e8b8a6199e2ad62c23fbbb18487ff (diff)
downloadgeniza-a699c92cdceb0382d9bfec37724166145a7eb6df.tar.gz
geniza-a699c92cdceb0382d9bfec37724166145a7eb6df.zip
WIP on node (synchronization) code
-rw-r--r--src/bin/geniza-net.rs24
-rw-r--r--src/lib.rs5
-rw-r--r--src/node.rs157
3 files changed, 185 insertions, 1 deletions
diff --git a/src/bin/geniza-net.rs b/src/bin/geniza-net.rs
index 38dc507..dd80c4f 100644
--- a/src/bin/geniza-net.rs
+++ b/src/bin/geniza-net.rs
@@ -55,6 +55,19 @@ fn run() -> Result<()> {
.about("Prints the DNS name to query (mDNS or centrally) for peers")
.arg_from_usage("<dat_key> 'dat key (public key) to convert (in hex)'"),
)
+ .subcommand(
+ SubCommand::with_name("naive-clone")
+ .about("Pulls a drive from a single (known) peer, using a naive algorithm")
+ .arg(Arg::with_name("dat-dir")
+ .short("d")
+ .long("dat-dir")
+ .value_name("PATH")
+ .help("dat drive directory")
+ .default_value(".dat")
+ .takes_value(true))
+ .arg_from_usage("<host_port> 'peer host:port to connect to'")
+ .arg_from_usage("<dat_key> 'dat key (public key) to pull"),
+ )
.get_matches();
@@ -113,6 +126,17 @@ fn run() -> Result<()> {
}
println!(".dat.local");
}
+ ("naive-clone", Some(subm)) => {
+ let host_port = subm.value_of("host_port").unwrap();
+ let dat_key = subm.value_of("dat_key").unwrap();
+ let key_bytes = parse_dat_key(&dat_key)?;
+ let dir = Path::new(subm.value_of("dat-dir").unwrap());
+ let mut metadata = SleepDirRegister::create(&dir, "metadata")?;
+ node_simple_clone(host_port, &key_bytes, &mut metadata, false)?;
+ // TODO: read out content key from metadata register
+ //let content = SleepDirRegister::create(&dir, "content")?;
+ //node_simple_clone(host_port, &key_bytes, &mut content, true)?;
+ }
_ => {
println!("Missing or unimplemented command!");
println!("{}", matches.usage());
diff --git a/src/lib.rs b/src/lib.rs
index f0059e6..10139d3 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -23,6 +23,7 @@ extern crate log;
extern crate protobuf;
extern crate rand;
extern crate sodiumoxide;
+extern crate bit_field;
#[cfg(test)]
extern crate tempdir;
@@ -52,8 +53,10 @@ mod protocol;
pub use protocol::*;
pub mod network_msgs;
pub mod metadata_msgs;
+mod node;
+pub use node::*;
-
+// Shared functions
use crypto::digest::Digest;
use crypto::blake2b::Blake2b;
diff --git a/src/node.rs b/src/node.rs
new file mode 100644
index 0000000..582bf3d
--- /dev/null
+++ b/src/node.rs
@@ -0,0 +1,157 @@
+
+use errors::*;
+use network_msgs::*;
+use protocol::{DatNetMessage, DatConnection};
+use integer_encoding::VarInt;
+use bit_field::BitArray;
+use sleep_register::HyperRegister;
+
+fn decode_bitfiled(raw_bf: &[u8]) -> Result<Vec<u8>> {
+ let mut offset = 0; // byte offset that we have read up to
+ if raw_bf.len() < 1 {
+ bail!("Expected (varint-encoded) bitfield to have len>=1");
+ }
+ let mut bit_array: Vec<u8> = vec![];
+ while offset < raw_bf.len() {
+ let (header, inc): (u64, usize) = VarInt::decode_var(&raw_bf[offset..]);
+ offset += inc;
+
+ if (header & 0x01) == 0x01 {
+ // compressed
+ let bit = (header & 0x02) == 0x02;
+ let run_len = header >> 2;
+ if bit {
+ bit_array.append(&mut vec![0xFF; run_len as usize]);
+ } else {
+ bit_array.append(&mut vec![0x00; run_len as usize]);
+ }
+ } else {
+ // uncompressed
+ let byte_count = header >> 1;
+ let mut data = raw_bf[offset..(offset + byte_count as usize)].to_vec();
+ bit_array.append(&mut data);
+ offset += byte_count as usize;
+ }
+ }
+ // XXX: HACK
+ bit_array.reverse();
+ return Ok(bit_array);
+}
+
+/// Finds the index of the lowest bit
+fn max_high_bit(bf: &[u8]) -> u64 {
+ // XXX: HACK, going backwards
+ for i in 0..bf.bit_length() {
+ if bf.get_bit(i) {
+ return (bf.bit_length() - i - 1) as u64;
+ }
+ }
+ return 0;
+}
+
+fn max_index(have_msg: &Have) -> Result<u64> {
+ if have_msg.has_length() {
+ return Ok(have_msg.get_start() + have_msg.get_length());
+ } else if have_msg.has_bitfield() {
+ let raw_bf = have_msg.get_bitfield();
+ let bf = decode_bitfiled(raw_bf)?;
+ trace!("decoded bitfield: {:?}", bf);
+ return Ok(max_high_bit(&bf));
+ } else {
+ return Ok(have_msg.get_start() + 1);
+ }
+}
+
+#[test]
+fn test_max_index() {
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_bitfield(vec![7,2,128]);
+ assert_eq!(max_index(&hm).unwrap(), 8);
+
+ hm.set_bitfield(vec![2, 207]);
+ assert_eq!(max_index(&hm).unwrap(), 7);
+
+ // Alphabet test dat
+ hm.set_bitfield(vec![2, 254]);
+ assert_eq!(max_index(&hm).unwrap(), 6);
+ hm.set_bitfield(vec![2, 252]);
+ assert_eq!(max_index(&hm).unwrap(), 5);
+}
+
+/// Tries to connect to a single peer, pull register, and close.
+pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, is_content: bool) -> Result<()> {
+
+ if register.len()? > 0 {
+ bail!("Register already had data in it (expected empty for naive clone)");
+ }
+
+ let mut dc = DatConnection::connect(host_port, key, false)?;
+
+ // Info: downloading, not uploading
+ let mut im = Info::new();
+ im.set_uploading(false);
+ im.set_downloading(true);
+ let im = DatNetMessage::Info(im);
+ dc.send_msg(&im, is_content)?;
+
+ // Have: nothing (so far)
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_length(0);
+ let hm = DatNetMessage::Have(hm);
+ dc.send_msg(&hm, is_content)?;
+
+ // UnHave: still nothing
+ let mut uhm = Unhave::new();
+ uhm.set_start(0);
+ let uhm = DatNetMessage::Unhave(uhm);
+ dc.send_msg(&uhm, is_content)?;
+
+ // Want: everything
+ let mut wm = Want::new();
+ wm.set_start(0);
+ let wm = DatNetMessage::Want(wm);
+ dc.send_msg(&wm, is_content)?;
+
+ let last_entry: u64;
+
+ // Receive Have messages to determine lengths
+ loop {
+ let (was_content, msg) = dc.recv_msg()?;
+ match msg {
+ DatNetMessage::Have(dh) => {
+ info!("is_content={}; {:?}; bitfield={:?}", was_content, dh, dh.get_bitfield());
+ last_entry = max_index(&dh)?;
+ break;
+ },
+ _ => {
+ info!("Other message: {:?}", &msg);
+ }
+ }
+ }
+
+ info!("last_entry={}", last_entry);
+
+ // Request / Data loops
+ for i in 0..(last_entry+1) {
+ let mut rm = Request::new();
+ rm.set_index(i);
+ info!("Sending request: {:?}", rm);
+ dc.send_msg(&DatNetMessage::Request(rm), false)?;
+ let (was_content, msg) = dc.recv_msg()?;
+ assert!(!was_content);
+ match msg {
+ DatNetMessage::Data(dm) => {
+ info!("Got data: index={}", dm.get_index());
+ assert!(dm.has_value());
+ assert!(dm.get_index() == i);
+ register.append(dm.get_value())?;
+ },
+ _ => {
+ info!("Other message: {:?}", &msg);
+ }
+ }
+ }
+ Ok(())
+}