aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2017-12-12 11:41:36 -0800
committerBryan Newbold <bnewbold@robocracy.org>2017-12-12 11:41:36 -0800
commit1a0131928e54fce0623afe1bea5799277b40be6a (patch)
tree210646de95a842270c17ed81ed7bee09d3db03fc
parent66bf3345a944e087cf8b66a2fbbceb143873c5e4 (diff)
downloadgeniza-1a0131928e54fce0623afe1bea5799277b40be6a.tar.gz
geniza-1a0131928e54fce0623afe1bea5799277b40be6a.zip
start flushing out new APIs
-rw-r--r--src/bin/geniza-drive.rs2
-rw-r--r--src/bitfield.rs20
-rw-r--r--src/drive.rs14
-rw-r--r--src/peer.rs41
-rw-r--r--src/protocol.rs5
-rw-r--r--src/synchronizer.rs39
6 files changed, 83 insertions, 38 deletions
diff --git a/src/bin/geniza-drive.rs b/src/bin/geniza-drive.rs
index 42e06e2..385ec3d 100644
--- a/src/bin/geniza-drive.rs
+++ b/src/bin/geniza-drive.rs
@@ -141,7 +141,7 @@ fn run() -> Result<()> {
None => Path::new("/").join(path.file_name().unwrap()),
Some(p) => Path::new("/").join(p)
};
- drive.import_dir(&path, &fpath)?;
+ drive.import_dir_all(&path, &fpath)?;
}
("export-dir", Some(subm)) => {
diff --git a/src/bitfield.rs b/src/bitfield.rs
index 11640ea..018ccbc 100644
--- a/src/bitfield.rs
+++ b/src/bitfield.rs
@@ -2,14 +2,22 @@
use errors::*;
use integer_encoding::VarInt;
use bit_field::BitArray;
+use network_msgs::Have;
+pub struct Bitfield {
+ inner: Vec<u64>,
+}
+
+impl Bitfield {
+
+ pub fn from_have_msg(msg: &Have) -> Bitfield {
+ unimplemented!()
+ }
-// WrappedBitfield
-//
-// uses vec of u64 internally?
-//
-// fn from_message()
-// fn get(u64)
+ pub fn get(&self, index: u64) -> Result<bool> {
+ unimplemented!()
+ }
+}
pub fn decode_bitfield(raw_bf: &[u8]) -> Result<Vec<u8>> {
let mut offset = 0; // byte offset that we have read up to
diff --git a/src/drive.rs b/src/drive.rs
index 7ec2288..01807a2 100644
--- a/src/drive.rs
+++ b/src/drive.rs
@@ -490,7 +490,7 @@ impl<'a> DatDrive {
/// Copies Stat metadata and all content from a directory (recursively) from the "real"
/// filesystem into the DatDrive.
/// On success, returns version number including all the added files.
- pub fn import_dir<P: AsRef<Path>, Q: AsRef<Path>>(&mut self, source: P, dest: Q) -> Result<u64> {
+ pub fn import_dir_all<P: AsRef<Path>, Q: AsRef<Path>>(&mut self, source: P, dest: Q) -> Result<u64> {
let source = source.as_ref();
let dest = dest.as_ref();
// TODO: check that dest doesn't exist (or is directory)
@@ -511,7 +511,7 @@ impl<'a> DatDrive {
continue
}
if path.is_dir() {
- ret = self.import_dir(path, dest.join(fname))?;
+ ret = self.import_dir_all(path, dest.join(fname))?;
} else {
ret = self.import_file(path, dest.join(fname))?;
}
@@ -522,7 +522,7 @@ impl<'a> DatDrive {
Ok(ret)
}
- /// Copies a file from the drive to the "real" filesystem, preserving Stat metadata.
+ /// Copies a full directory from the drive to the "real" filesystem, preserving Stat metadata.
pub fn export_dir<P: AsRef<Path>, Q: AsRef<Path>>(&mut self, source: P, dest: Q) -> Result<()> {
let source = source.as_ref();
let dest = dest.as_ref();
@@ -776,7 +776,7 @@ fn test_dd_export_file() {
}
#[test]
-fn test_dd_import_dir() {
+fn test_dd_import_dir_all() {
use tempdir::TempDir;
use env_logger;
@@ -784,13 +784,13 @@ fn test_dd_import_dir() {
let tmp_dir = TempDir::new("geniza-test").unwrap();
let mut dd = DatDrive::create(tmp_dir.path()).unwrap();
- dd.import_dir("test-data/dat/tree/Animalia/", "/").unwrap();
+ dd.import_dir_all("test-data/dat/tree/Animalia/", "/").unwrap();
assert_eq!(dd.read_dir("/").count(), 0);
assert_eq!(dd.read_dir_recursive("/").count(), 2);
dd.import_file("test-data/dat/alphabet/a", "/a").unwrap();
- assert!(dd.import_dir("test-data/dat/tree/Animalia/", "/a/").is_err());
+ assert!(dd.import_dir_all("test-data/dat/tree/Animalia/", "/a/").is_err());
}
@@ -803,7 +803,7 @@ fn test_dd_export_dir() {
let tmp_dir = TempDir::new("geniza-test").unwrap();
let mut dd = DatDrive::create(tmp_dir.path()).unwrap();
- dd.import_dir("test-data/dat/tree/Animalia/", "/").unwrap();
+ dd.import_dir_all("test-data/dat/tree/Animalia/", "/").unwrap();
dd.export_dir("/", tmp_dir.path()).unwrap();
dd.export_dir("/Chordata/Mammalia/Carnivora/Caniformia/", tmp_dir.path()).unwrap();
diff --git a/src/peer.rs b/src/peer.rs
index 5535b09..29b90a2 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -1,12 +1,31 @@
-//use errors::*;
-
-// DatPeer
-// connection (thread?)
-// last_sent_ts
-// last_received_ts
-// has_messages
-// registers: vec? map?
-//
-// fn has(reg, u64)
-// fn has_intersection(reg, bitfield)
+use errors::*;
+use sleep_register::{HyperRegister, SleepDirRegister};
+use protocol::DatConnection;
+use bitfield::Bitfield;
+
+pub struct DatPeer {
+ registers: Vec<SleepDirRegister>,
+ connection: DatConnection,
+ have_log: Vec<Vec<Bitfield>>,
+}
+
+impl DatPeer {
+
+ pub fn new(connection: DatConnection, registers: Vec<SleepDirRegister>) -> DatPeer {
+ DatPeer {
+ registers,
+ connection,
+ have_log: vec![],
+ }
+ }
+
+ pub fn has(self, register: u64, index: u64) -> Result<bool> {
+ for bitfield in self.have_log[register as usize].iter() {
+ if bitfield.get(index)? {
+ return Ok(true)
+ }
+ }
+ Ok(false)
+ }
+}
diff --git a/src/protocol.rs b/src/protocol.rs
index 2b23959..5940840 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -60,6 +60,7 @@ fn msg_sugar(msg: &DatNetMessage) -> &Message {
/// This helper is pretty slow/inefficient; lots of copying memory
fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonce, key: &Key) {
+ // TODO: switch to new stream_xor_ic_inplace() variant?
let mut offset = byte_offset;
// We may have a partial-64-byte-block to finish encrypting first
@@ -70,14 +71,14 @@ fn bytewise_stream_xor_ic_inplace(buf: &mut [u8], byte_offset: u64, nonce: &Nonc
for i in 0..partial_len {
partial[partial_offset + i] = buf[i];
}
- let partial_enc = stream_xor_ic(&partial, offset / 64, &nonce, &key);
+ let partial_enc = stream_xor_ic(&partial, &nonce, offset / 64, &key);
offset += partial_len as u64;
for i in 0..partial_len {
buf[i] = partial_enc[partial_offset + i];
}
}
if buf.len() > partial_len {
- let main_enc = stream_xor_ic(&buf[partial_len..], offset / 64, &nonce, &key);
+ let main_enc = stream_xor_ic(&buf[partial_len..], &nonce, offset / 64, &key);
//offset += main_enc.len() as u64;
for i in 0..main_enc.len() {
buf[partial_len + i] = main_enc[i];
diff --git a/src/synchronizer.rs b/src/synchronizer.rs
index 216df6e..4e803fb 100644
--- a/src/synchronizer.rs
+++ b/src/synchronizer.rs
@@ -4,18 +4,35 @@ use network_msgs::*;
use bitfield::*;
use protocol::{DatNetMessage, DatConnection};
use sleep_register::HyperRegister;
+use peer::DatPeer;
+use sleep_register::SleepDirRegister;
+
+pub enum SyncMode {
+ RxMax,
+ RxEndless,
+ TxEndless,
+ RxTxEndless,
+}
+
+pub struct Synchronizer {
+ peers: Vec<DatPeer>,
+ registers: Vec<SleepDirRegister>,
+ mode: SyncMode,
+ wanted: Bitfield,
+ inflight: Vec<Vec<u64>>,
+}
+
+impl Synchronizer {
-// Synchronizer
-// register_keys
-// peers: vec
-// registers: HyperRegisters
-// mode: enum
-// state: enum
-// wanted: bitfield
-// requested: vec
-//
-// fn next_wanted() -> Option((reg, u64))
-// fn tick()
+ pub fn next_wanted(&mut self, reg: u64) -> Option<(u64, u64)> {
+ // XXX
+ None
+ }
+
+ pub fn tick(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
fn max_index(have_msg: &Have) -> Result<u64> {