aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-01-22 02:31:00 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-01-22 02:31:00 -0800
commit72f65d94b5eb95ef4fc1632adea1cfe4022281dd (patch)
treea77ca4b558408027e79347f75d2b1f4ce79c8d85 /src
parentef8ea37d26716869cd4572152ffedc047700f747 (diff)
downloadgeniza-72f65d94b5eb95ef4fc1632adea1cfe4022281dd.tar.gz
geniza-72f65d94b5eb95ef4fc1632adea1cfe4022281dd.zip
WIP on synchronizer state machine
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs1
-rw-r--r--src/peer.rs8
-rw-r--r--src/synchronizer.rs118
3 files changed, 113 insertions, 14 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 8a4825b..5370743 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -28,6 +28,7 @@ extern crate resolve;
extern crate data_encoding;
#[macro_use]
extern crate chan;
+extern crate bit_vec;
#[cfg(test)]
extern crate tempdir;
diff --git a/src/peer.rs b/src/peer.rs
index c6f8c6d..0f04a40 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -13,15 +13,15 @@ use chan;
/// Wraps a low-level DatConnection in a thread (or two). Contains very little context about
/// itself.
pub struct DatPeerThread {
- handle: u64,
+ pub handle: u64,
feeds: Vec<(u8, Key)>,
outbound_chan: chan::Sender<(DatNetMessage, u8)>,
}
pub struct PeerMsg {
- peer_handle: u64,
- feed_index: u8,
- msg: DatNetMessage,
+ pub peer_handle: u64,
+ pub feed_index: u8,
+ pub msg: DatNetMessage,
}
/// This is what the "receive" loop does: simply blocking reads on the TCP socket, passing any
diff --git a/src/synchronizer.rs b/src/synchronizer.rs
index 4e664f5..3cfe57d 100644
--- a/src/synchronizer.rs
+++ b/src/synchronizer.rs
@@ -2,10 +2,17 @@
use errors::*;
use network_msgs::*;
use bitfield::*;
+use std::collections::HashMap;
+use std::path::{Path, PathBuf};
use protocol::{DatNetMessage, DatConnection};
+use rand::{OsRng, Rng};
use sleep_register::HyperRegister;
-use peer::DatPeer;
+use peer::{DatPeerThread, PeerMsg};
use sleep_register::SleepDirRegister;
+use sodiumoxide::crypto::stream::Key;
+use bit_vec::BitVec;
+use discovery::discover_peers_dns;
+use chan;
pub enum SyncMode {
RxMax,
@@ -14,22 +21,112 @@ pub enum SyncMode {
RxTxEndless,
}
+pub struct RegisterStatus {
+ id: u8,
+ register: SleepDirRegister,
+ inflight: Vec<u64>,
+ wanted: BitVec,
+ key: Key,
+}
+
pub struct Synchronizer {
- peers: Vec<DatPeer>,
- registers: Vec<SleepDirRegister>,
+ peers: HashMap<u64, DatPeerThread>,
+ registers: Vec<RegisterStatus>,
mode: SyncMode,
- wanted: Bitfield,
- inflight: Vec<Vec<u64>>,
+ local_id: [u8; 32],
+ dir: Option<PathBuf>,
+ unified_peers_tx: chan::Sender<Result<PeerMsg>>,
+ unified_peers_rx: chan::Receiver<Result<PeerMsg>>,
}
impl Synchronizer {
- pub fn next_wanted(&mut self, reg: u64) -> Option<(u64, u64)> {
- // XXX
- None
+ pub fn new_downloader(key: Key, mode: SyncMode, dir: &Path) -> Result<Synchronizer> {
+
+ let mut metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?;
+
+ let (unified_peers_tx, unified_peers_rx) = chan::async();
+
+ let metadata_status = RegisterStatus {
+ id: 0,
+ register: metadata_reg,
+ inflight: vec![],
+ wanted: BitVec::new(),
+ key,
+ };
+
+ let mut rng = OsRng::new()?;
+ let mut local_id = [0; 32];
+ rng.fill_bytes(&mut local_id);
+
+ let s = Synchronizer {
+ peers: HashMap::new(),
+ mode,
+ local_id,
+ dir: Some(dir.to_path_buf()),
+ registers: vec![metadata_status],
+ unified_peers_tx,
+ unified_peers_rx,
+ };
+ Ok(s)
+ }
+
+ pub fn run(&mut self) -> Result<()> {
+
+ let meta_key = &self.registers[0].key;
+ let peers = discover_peers_dns(&meta_key[0..32])?;
+ let mut rng = OsRng::new()?;
+ for p in peers {
+ let handle = rng.gen::<u64>();
+ let pt = DatPeerThread::connect(p, meta_key.clone(), handle, false, Some(&self.local_id), self.unified_peers_tx.clone())?;
+ self.peers.insert(handle, pt);
+ // TODO: send a large want? or wait for haves?
+ };
+
+ // bug in chan_select!() breaking `self` reference?
+ let unified_peers_rx = self.unified_peers_rx.clone();
+
+ loop {
+ chan_select! {
+ unified_peers_rx.recv() -> val => { },
+ };
+ }
}
- pub fn tick(&mut self) -> Result<()> {
+ /*
+ match val {
+ Some(Ok(pm)) => {
+ self.handle_msg(&pm);
+ },
+ Some(Err(e)) => {
+ // TODO: don't bail here
+ bail!("Got a client error: {}", e);
+ },
+ _ => { unimplemented!() },
+ };
+ */
+
+ fn handle_msg(&mut self, pm: &PeerMsg) -> Result<()> {
+
+ match &pm.msg {
+ &DatNetMessage::Feed(_) => { unimplemented!() },
+ &DatNetMessage::Handshake(_) => { unimplemented!() },
+ &DatNetMessage::Info(_) => { unimplemented!() },
+ &DatNetMessage::Have(ref have) => {
+ // TODO: bulk-add haves to peer status
+ },
+ &DatNetMessage::Unhave(_) => {}, // PASS
+ &DatNetMessage::Want(_) => {}, // PASS
+ &DatNetMessage::Unwant(_) => {}, // PASS
+ &DatNetMessage::Request(_) => {}, // PASS
+ &DatNetMessage::Cancel(_) => {}, // PASS
+ &DatNetMessage::Data(_) => {
+ if pm.feed_index as usize >= self.registers.len() {
+ // TODO: invalid feed! drop connection
+ }
+ // TODO: insert into feed
+ },
+ }
Ok(())
}
}
@@ -72,7 +169,8 @@ pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegist
bail!("Register already had data in it (expected empty for naive clone)");
}
- let mut dc = DatConnection::connect(host_port, key, false)?;
+ let key = Key::from_slice(key).unwrap();
+ let mut dc = DatConnection::connect(host_port, &key, false, None)?;
// Info: downloading, not uploading
let mut im = Info::new();