diff options
| -rw-r--r-- | src/bin/geniza-drive.rs | 2 | ||||
| -rw-r--r-- | src/bitfield.rs | 20 | ||||
| -rw-r--r-- | src/drive.rs | 14 | ||||
| -rw-r--r-- | src/peer.rs | 41 | ||||
| -rw-r--r-- | src/protocol.rs | 5 | ||||
| -rw-r--r-- | src/synchronizer.rs | 39 | 
6 files changed, 83 insertions, 38 deletions
| diff --git a/src/bin/geniza-drive.rs b/src/bin/geniza-drive.rs index 42e06e2..385ec3d 100644 --- a/src/bin/geniza-drive.rs +++ b/src/bin/geniza-drive.rs @@ -141,7 +141,7 @@ fn run() -> Result<()> {                  None => Path::new("/").join(path.file_name().unwrap()),                  Some(p) => Path::new("/").join(p)              }; -            drive.import_dir(&path, &fpath)?; +            drive.import_dir_all(&path, &fpath)?;          }          ("export-dir", Some(subm)) => { diff --git a/src/bitfield.rs b/src/bitfield.rs index 11640ea..018ccbc 100644 --- a/src/bitfield.rs +++ b/src/bitfield.rs @@ -2,14 +2,22 @@  use errors::*;  use integer_encoding::VarInt;  use bit_field::BitArray; +use network_msgs::Have; +pub struct Bitfield { +    inner: Vec<u64>, +} + +impl Bitfield { + +    pub fn from_have_msg(msg: &Have) -> Bitfield { +        unimplemented!() +    } -// WrappedBitfield -// -// uses vec of u64 internally? -//  -// fn from_message() -// fn get(u64) +    pub fn get(&self, index: u64) -> Result<bool> { +        unimplemented!() +    } +}  pub fn decode_bitfield(raw_bf: &[u8]) -> Result<Vec<u8>> {      let mut offset = 0; // byte offset that we have read up to diff --git a/src/drive.rs b/src/drive.rs index 7ec2288..01807a2 100644 --- a/src/drive.rs +++ b/src/drive.rs @@ -490,7 +490,7 @@ impl<'a> DatDrive {      /// Copies Stat metadata and all content from a directory (recursively) from the "real"      /// filesystem into the DatDrive.      /// On success, returns version number including all the added files. -    pub fn import_dir<P: AsRef<Path>, Q: AsRef<Path>>(&mut self, source: P, dest: Q) -> Result<u64> { +    pub fn import_dir_all<P: AsRef<Path>, Q: AsRef<Path>>(&mut self, source: P, dest: Q) -> Result<u64> {          let source = source.as_ref();          let dest = dest.as_ref();          // TODO: check that dest doesn't exist (or is directory) @@ -511,7 +511,7 @@ impl<'a> DatDrive {                      continue                  }                  if path.is_dir() { -                    ret = self.import_dir(path, dest.join(fname))?; +                    ret = self.import_dir_all(path, dest.join(fname))?;                  } else {                      ret = self.import_file(path, dest.join(fname))?;                  } @@ -522,7 +522,7 @@ impl<'a> DatDrive {          Ok(ret)      } -    /// Copies a file from the drive to the "real" filesystem, preserving Stat metadata. +    /// Copies a full directory from the drive to the "real" filesystem, preserving Stat metadata.      pub fn export_dir<P: AsRef<Path>, Q: AsRef<Path>>(&mut self, source: P, dest: Q) -> Result<()> {          let source = source.as_ref();          let dest = dest.as_ref(); @@ -776,7 +776,7 @@ fn test_dd_export_file() {  }  #[test] -fn test_dd_import_dir() { +fn test_dd_import_dir_all() {      use tempdir::TempDir;      use env_logger; @@ -784,13 +784,13 @@ fn test_dd_import_dir() {      let tmp_dir = TempDir::new("geniza-test").unwrap();      let mut dd = DatDrive::create(tmp_dir.path()).unwrap(); -    dd.import_dir("test-data/dat/tree/Animalia/", "/").unwrap(); +    dd.import_dir_all("test-data/dat/tree/Animalia/", "/").unwrap();      assert_eq!(dd.read_dir("/").count(), 0);      assert_eq!(dd.read_dir_recursive("/").count(), 2);      dd.import_file("test-data/dat/alphabet/a", "/a").unwrap(); -    assert!(dd.import_dir("test-data/dat/tree/Animalia/", "/a/").is_err()); +    assert!(dd.import_dir_all("test-data/dat/tree/Animalia/", "/a/").is_err());  } @@ -803,7 +803,7 @@ fn test_dd_export_dir() {      let tmp_dir = TempDir::new("geniza-test").unwrap();      let mut dd = DatDrive::create(tmp_dir.path()).unwrap(); -    dd.import_dir("test-data/dat/tree/Animalia/", "/").unwrap(); +    dd.import_dir_all("test-data/dat/tree/Animalia/", "/").unwrap();      dd.export_dir("/", tmp_dir.path()).unwrap();      dd.export_dir("/Chordata/Mammalia/Carnivora/Caniformia/", tmp_dir.path()).unwrap(); diff --git a/src/peer.rs b/src/peer.rs index 5535b09..29b90a2 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,12 +1,31 @@ -//use errors::*; - -// DatPeer -//  connection (thread?) -//  last_sent_ts -//  last_received_ts -//  has_messages -//  registers: vec? map? -// -// fn has(reg, u64) -// fn has_intersection(reg, bitfield) +use errors::*; +use sleep_register::{HyperRegister, SleepDirRegister}; +use protocol::DatConnection; +use bitfield::Bitfield; + +pub struct DatPeer { +    registers: Vec<SleepDirRegister>, +    connection: DatConnection, +    have_log: Vec<Vec<Bitfield>>, +} + +impl DatPeer { + +    pub fn new(connection: DatConnection, registers: Vec<SleepDirRegister>) -> DatPeer { +        DatPeer { +            registers, +            connection, +            have_log: vec![], +        } +    } + +    pub fn has(self, register: u64, index: u64) -> Result<bool> { +        for bitfield in self.have_log[register as usize].iter() { +            if bitfield.get(index)? { +                return Ok(true) +            } +        } +        Ok(false) +    } +} diff --git a/src/protocol.rs b/src/protocol.rs index 2b23959..5940840 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -60,6 +60,7 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message {  /// This helper is pretty slow/inefficient; lots of copying memory  fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) { +    // TODO: switch to new stream_xor_ic_inplace() variant?      let mut offset = byte_offset;      // We may have a partial-64-byte-block to finish encrypting first @@ -70,14 +71,14 @@ fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonc          for i in 0..partial_len {              partial[partial_offset + i] = buf[i];          } -        let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key); +        let partial_enc = stream_xor_ic(&partial, &nonce, offset / 64, &key);          offset += partial_len as u64;          for i in 0..partial_len {              buf[i] = partial_enc[partial_offset + i];          }      }      if buf.len() > partial_len { -        let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key); +        let main_enc = stream_xor_ic(&buf[partial_len..], &nonce, offset / 64, &key);          //offset += main_enc.len() as u64;          for i in 0..main_enc.len() {              buf[partial_len + i] = main_enc[i]; diff --git a/src/synchronizer.rs b/src/synchronizer.rs index 216df6e..4e803fb 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -4,18 +4,35 @@ use network_msgs::*;  use bitfield::*;  use protocol::{DatNetMessage, DatConnection};  use sleep_register::HyperRegister; +use peer::DatPeer; +use sleep_register::SleepDirRegister; + +pub enum SyncMode { +    RxMax, +    RxEndless, +    TxEndless, +    RxTxEndless, +} + +pub struct Synchronizer { +    peers: Vec<DatPeer>, +    registers: Vec<SleepDirRegister>, +    mode: SyncMode, +    wanted: Bitfield, +    inflight: Vec<Vec<u64>>, +} + +impl Synchronizer { -// Synchronizer -//  register_keys -//  peers: vec -//  registers: HyperRegisters -//  mode: enum -//  state: enum -//  wanted: bitfield -//  requested: vec -// -// fn next_wanted() -> Option((reg, u64)) -// fn tick() +    pub fn next_wanted(&mut self, reg: u64) -> Option<(u64, u64)> { +        // XXX +        None +    } + +    pub fn tick(&mut self) -> Result<()> { +        Ok(()) +    } +}  fn max_index(have_msg: &Have) -> Result<u64> { | 
