aboutsummaryrefslogtreecommitdiffstats
path: root/src/synchronizer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/synchronizer.rs')
-rw-r--r--src/synchronizer.rs141
1 files changed, 118 insertions, 23 deletions
diff --git a/src/synchronizer.rs b/src/synchronizer.rs
index 3cfe57d..2327600 100644
--- a/src/synchronizer.rs
+++ b/src/synchronizer.rs
@@ -12,6 +12,9 @@ use sleep_register::SleepDirRegister;
use sodiumoxide::crypto::stream::Key;
use bit_vec::BitVec;
use discovery::discover_peers_dns;
+use protobuf::parse_from_bytes;
+use network_msgs::Data;
+use metadata_msgs::Index;
use chan;
pub enum SyncMode {
@@ -33,6 +36,7 @@ pub struct Synchronizer {
peers: HashMap<u64, DatPeerThread>,
registers: Vec<RegisterStatus>,
mode: SyncMode,
+ is_drive: bool,
local_id: [u8; 32],
dir: Option<PathBuf>,
unified_peers_tx: chan::Sender<Result<PeerMsg>>,
@@ -43,7 +47,7 @@ impl Synchronizer {
pub fn new_downloader(key: Key, mode: SyncMode, dir: &Path) -> Result<Synchronizer> {
- let mut metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?;
+ let metadata_reg = SleepDirRegister::create(dir.as_ref(), "metadata")?;
let (unified_peers_tx, unified_peers_rx) = chan::async();
@@ -63,6 +67,7 @@ impl Synchronizer {
peers: HashMap::new(),
mode,
local_id,
+ is_drive: true,
dir: Some(dir.to_path_buf()),
registers: vec![metadata_status],
unified_peers_tx,
@@ -73,64 +78,123 @@ impl Synchronizer {
pub fn run(&mut self) -> Result<()> {
- let meta_key = &self.registers[0].key;
+ let meta_key = &self.registers.get(0).unwrap().key.clone();
let peers = discover_peers_dns(&meta_key[0..32])?;
let mut rng = OsRng::new()?;
for p in peers {
let handle = rng.gen::<u64>();
let pt = DatPeerThread::connect(p, meta_key.clone(), handle, false, Some(&self.local_id), self.unified_peers_tx.clone())?;
self.peers.insert(handle, pt);
- // TODO: send a large want? or wait for haves?
+ let pt = self.peers.get_mut(&handle).unwrap();
+
+ match self.mode {
+ SyncMode::RxMax => {
+ init_want_everything(pt, 0)?;
+ },
+ SyncMode::RxEndless => unimplemented!(),
+ SyncMode::TxEndless => unimplemented!(),
+ SyncMode::RxTxEndless => unimplemented!(),
+ };
};
// bug in chan_select!() breaking `self` reference?
+ // "recursion limit reached while expanding the macro `chan_select`"
let unified_peers_rx = self.unified_peers_rx.clone();
loop {
chan_select! {
- unified_peers_rx.recv() -> val => { },
+ unified_peers_rx.recv() -> val => {
+ if let Some(Ok(pm)) = val {
+ self.handle_msg(&pm)?;
+ }
+ },
};
}
}
- /*
- match val {
- Some(Ok(pm)) => {
- self.handle_msg(&pm);
- },
- Some(Err(e)) => {
- // TODO: don't bail here
- bail!("Got a client error: {}", e);
- },
- _ => { unimplemented!() },
- };
- */
-
fn handle_msg(&mut self, pm: &PeerMsg) -> Result<()> {
+ // mutable ref to PeerThread for this message
+ let pt = self.peers.get_mut(&pm.peer_handle).unwrap();
+
+ // NB: this is the simplistic model of registers (only works up to 2x per peer?)
+ if pm.feed_index as usize >= self.registers.len() {
+ // XXX: invalid feed! drop connection
+ pt.close()?;
+ }
+
match &pm.msg {
&DatNetMessage::Feed(_) => { unimplemented!() },
&DatNetMessage::Handshake(_) => { unimplemented!() },
&DatNetMessage::Info(_) => { unimplemented!() },
- &DatNetMessage::Have(ref have) => {
- // TODO: bulk-add haves to peer status
+ &DatNetMessage::Have(ref msg) => {
+ // TODO: depending on mode...
+
+ //let peer_has = extract_bitfield(msg)?;
+ // TODO: remove bits we already have
+ // TODO: depending on mode, extend 'wanted' bits
+ // TODO: send a Request on this channel
+ // XXX: dummy for testing
+ let mut request = Request::new();
+ request.set_index(msg.get_start());
+ pt.send(DatNetMessage::Request(request), pm.feed_index)?;
},
&DatNetMessage::Unhave(_) => {}, // PASS
&DatNetMessage::Want(_) => {}, // PASS
&DatNetMessage::Unwant(_) => {}, // PASS
&DatNetMessage::Request(_) => {}, // PASS
&DatNetMessage::Cancel(_) => {}, // PASS
- &DatNetMessage::Data(_) => {
- if pm.feed_index as usize >= self.registers.len() {
- // TODO: invalid feed! drop connection
+ &DatNetMessage::Data(ref msg) => {
+
+ // TODO: feed indexing?
+ // Insert into local feed
+ // XXX self.registers[pm.feed_index].insert(msg);
+
+ // If a drive, and this is the first entry of metadata feed, it has the config for
+ // the content feed
+ if self.is_drive && pm.feed_index == 0 && msg.get_index() == 0 {
+ let data_key = parse_drive_data_key(msg)?;
+ pt.add_feed(&data_key)?;
+ init_want_everything(pt, 1)?;
+
+ // If we haven't already, create and save a local register
+ if self.registers.len() < 2 {
+
+ let dir = self.dir.clone().unwrap();
+ let content_reg = SleepDirRegister::create(&dir, "content")?;
+
+ let content_status = RegisterStatus {
+ id: 1,
+ register: content_reg,
+ inflight: vec![],
+ wanted: BitVec::new(),
+ key: data_key,
+ };
+
+ self.registers.push(content_status);
+ }
}
- // TODO: insert into feed
+
+ // TODO: send next wanted, or otherwise update state
},
}
Ok(())
}
}
+fn parse_drive_data_key(data_msg: &Data) -> Result<Key> {
+ let index_msg = parse_from_bytes::<Index>(&mut data_msg.get_value())?;
+ if index_msg.get_field_type() == "hyperdrive" {
+ let data_key = index_msg.get_content();
+ if data_key.len() != 32 {
+ bail!("Received data key had wrong length: {}", data_key.len());
+ }
+ return Ok(Key::from_slice(&data_key[0..32]).unwrap());
+ } else {
+ bail!("non-hyperdrive Index type: {}", index_msg.get_field_type());
+ }
+}
+
fn max_index(have_msg: &Have) -> Result<u64> {
if have_msg.has_length() {
@@ -162,6 +226,37 @@ fn test_max_index() {
assert_eq!(max_index(&hm).unwrap(), 5);
}
+fn init_want_everything(dpt: &mut DatPeerThread, reg_index: u8) -> Result<()> {
+
+ // Info: downloading, not uploading
+ let mut im = Info::new();
+ im.set_uploading(false);
+ im.set_downloading(true);
+ let im = DatNetMessage::Info(im);
+ dpt.send(im, reg_index)?;
+
+ // Have: nothing (so far)
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_length(0);
+ let hm = DatNetMessage::Have(hm);
+ dpt.send(hm, reg_index)?;
+
+ // UnHave: still nothing
+ let mut uhm = Unhave::new();
+ uhm.set_start(0);
+ let uhm = DatNetMessage::Unhave(uhm);
+ dpt.send(uhm, reg_index)?;
+
+ // Want: everything
+ let mut wm = Want::new();
+ wm.set_start(0);
+ let wm = DatNetMessage::Want(wm);
+ dpt.send(wm, reg_index)?;
+
+ Ok(())
+}
+
/// Tries to connect to a single peer, pull register, and close.
pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, reg_index: u8) -> Result<()> {