From 1a0131928e54fce0623afe1bea5799277b40be6a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 12 Dec 2017 11:41:36 -0800 Subject: start flushing out new APIs --- src/bin/geniza-drive.rs | 2 +- src/bitfield.rs | 20 ++++++++++++++------ src/drive.rs | 14 +++++++------- src/peer.rs | 41 ++++++++++++++++++++++++++++++----------- src/protocol.rs | 5 +++-- src/synchronizer.rs | 39 ++++++++++++++++++++++++++++----------- 6 files changed, 83 insertions(+), 38 deletions(-) (limited to 'src') diff --git a/src/bin/geniza-drive.rs b/src/bin/geniza-drive.rs index 42e06e2..385ec3d 100644 --- a/src/bin/geniza-drive.rs +++ b/src/bin/geniza-drive.rs @@ -141,7 +141,7 @@ fn run() -> Result<()> { None => Path::new("/").join(path.file_name().unwrap()), Some(p) => Path::new("/").join(p) }; - drive.import_dir(&path, &fpath)?; + drive.import_dir_all(&path, &fpath)?; } ("export-dir", Some(subm)) => { diff --git a/src/bitfield.rs b/src/bitfield.rs index 11640ea..018ccbc 100644 --- a/src/bitfield.rs +++ b/src/bitfield.rs @@ -2,14 +2,22 @@ use errors::*; use integer_encoding::VarInt; use bit_field::BitArray; +use network_msgs::Have; +pub struct Bitfield { + inner: Vec, +} + +impl Bitfield { + + pub fn from_have_msg(msg: &Have) -> Bitfield { + unimplemented!() + } -// WrappedBitfield -// -// uses vec of u64 internally? -// -// fn from_message() -// fn get(u64) + pub fn get(&self, index: u64) -> Result { + unimplemented!() + } +} pub fn decode_bitfield(raw_bf: &[u8]) -> Result> { let mut offset = 0; // byte offset that we have read up to diff --git a/src/drive.rs b/src/drive.rs index 7ec2288..01807a2 100644 --- a/src/drive.rs +++ b/src/drive.rs @@ -490,7 +490,7 @@ impl<'a> DatDrive { /// Copies Stat metadata and all content from a directory (recursively) from the "real" /// filesystem into the DatDrive. /// On success, returns version number including all the added files. - pub fn import_dir, Q: AsRef>(&mut self, source: P, dest: Q) -> Result { + pub fn import_dir_all, Q: AsRef>(&mut self, source: P, dest: Q) -> Result { let source = source.as_ref(); let dest = dest.as_ref(); // TODO: check that dest doesn't exist (or is directory) @@ -511,7 +511,7 @@ impl<'a> DatDrive { continue } if path.is_dir() { - ret = self.import_dir(path, dest.join(fname))?; + ret = self.import_dir_all(path, dest.join(fname))?; } else { ret = self.import_file(path, dest.join(fname))?; } @@ -522,7 +522,7 @@ impl<'a> DatDrive { Ok(ret) } - /// Copies a file from the drive to the "real" filesystem, preserving Stat metadata. + /// Copies a full directory from the drive to the "real" filesystem, preserving Stat metadata. pub fn export_dir, Q: AsRef>(&mut self, source: P, dest: Q) -> Result<()> { let source = source.as_ref(); let dest = dest.as_ref(); @@ -776,7 +776,7 @@ fn test_dd_export_file() { } #[test] -fn test_dd_import_dir() { +fn test_dd_import_dir_all() { use tempdir::TempDir; use env_logger; @@ -784,13 +784,13 @@ fn test_dd_import_dir() { let tmp_dir = TempDir::new("geniza-test").unwrap(); let mut dd = DatDrive::create(tmp_dir.path()).unwrap(); - dd.import_dir("test-data/dat/tree/Animalia/", "/").unwrap(); + dd.import_dir_all("test-data/dat/tree/Animalia/", "/").unwrap(); assert_eq!(dd.read_dir("/").count(), 0); assert_eq!(dd.read_dir_recursive("/").count(), 2); dd.import_file("test-data/dat/alphabet/a", "/a").unwrap(); - assert!(dd.import_dir("test-data/dat/tree/Animalia/", "/a/").is_err()); + assert!(dd.import_dir_all("test-data/dat/tree/Animalia/", "/a/").is_err()); } @@ -803,7 +803,7 @@ fn test_dd_export_dir() { let tmp_dir = TempDir::new("geniza-test").unwrap(); let mut dd = DatDrive::create(tmp_dir.path()).unwrap(); - dd.import_dir("test-data/dat/tree/Animalia/", "/").unwrap(); + dd.import_dir_all("test-data/dat/tree/Animalia/", "/").unwrap(); dd.export_dir("/", tmp_dir.path()).unwrap(); dd.export_dir("/Chordata/Mammalia/Carnivora/Caniformia/", tmp_dir.path()).unwrap(); diff --git a/src/peer.rs b/src/peer.rs index 5535b09..29b90a2 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,12 +1,31 @@ -//use errors::*; - -// DatPeer -// connection (thread?) -// last_sent_ts -// last_received_ts -// has_messages -// registers: vec? map? -// -// fn has(reg, u64) -// fn has_intersection(reg, bitfield) +use errors::*; +use sleep_register::{HyperRegister, SleepDirRegister}; +use protocol::DatConnection; +use bitfield::Bitfield; + +pub struct DatPeer { + registers: Vec, + connection: DatConnection, + have_log: Vec>, +} + +impl DatPeer { + + pub fn new(connection: DatConnection, registers: Vec) -> DatPeer { + DatPeer { + registers, + connection, + have_log: vec![], + } + } + + pub fn has(self, register: u64, index: u64) -> Result { + for bitfield in self.have_log[register as usize].iter() { + if bitfield.get(index)? { + return Ok(true) + } + } + Ok(false) + } +} diff --git a/src/protocol.rs b/src/protocol.rs index 2b23959..5940840 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -60,6 +60,7 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message { /// This helper is pretty slow/inefficient; lots of copying memory fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) { + // TODO: switch to new stream_xor_ic_inplace() variant? let mut offset = byte_offset; // We may have a partial-64-byte-block to finish encrypting first @@ -70,14 +71,14 @@ fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonc for i in 0..partial_len { partial[partial_offset + i] = buf[i]; } - let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key); + let partial_enc = stream_xor_ic(&partial, &nonce, offset / 64, &key); offset += partial_len as u64; for i in 0..partial_len { buf[i] = partial_enc[partial_offset + i]; } } if buf.len() > partial_len { - let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key); + let main_enc = stream_xor_ic(&buf[partial_len..], &nonce, offset / 64, &key); //offset += main_enc.len() as u64; for i in 0..main_enc.len() { buf[partial_len + i] = main_enc[i]; diff --git a/src/synchronizer.rs b/src/synchronizer.rs index 216df6e..4e803fb 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -4,18 +4,35 @@ use network_msgs::*; use bitfield::*; use protocol::{DatNetMessage, DatConnection}; use sleep_register::HyperRegister; +use peer::DatPeer; +use sleep_register::SleepDirRegister; + +pub enum SyncMode { + RxMax, + RxEndless, + TxEndless, + RxTxEndless, +} + +pub struct Synchronizer { + peers: Vec, + registers: Vec, + mode: SyncMode, + wanted: Bitfield, + inflight: Vec>, +} + +impl Synchronizer { -// Synchronizer -// register_keys -// peers: vec -// registers: HyperRegisters -// mode: enum -// state: enum -// wanted: bitfield -// requested: vec -// -// fn next_wanted() -> Option((reg, u64)) -// fn tick() + pub fn next_wanted(&mut self, reg: u64) -> Option<(u64, u64)> { + // XXX + None + } + + pub fn tick(&mut self) -> Result<()> { + Ok(()) + } +} fn max_index(have_msg: &Have) -> Result { -- cgit v1.2.3