diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-07-31 22:45:14 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-07-31 22:46:27 -0700 |
commit | 7662f08f2ec0f0179d42a8f79bc5eeed70fd2dd8 (patch) | |
tree | eec47c826555a822d67afc90be9c8393d0162917 /src | |
parent | 7164f10355fcdc45d3be58bfcee2939937e01623 (diff) | |
download | geniza-old-wip.tar.gz geniza-old-wip.zip |
commit old WIPold-wip
Diffstat (limited to 'src')
-rw-r--r-- | src/blah | 92 | ||||
-rw-r--r-- | src/peer.rs | 2 | ||||
-rw-r--r-- | src/sleep_register.rs | 66 | ||||
-rw-r--r-- | src/synchronizer.rs | 5 |
4 files changed, 164 insertions, 1 deletions
diff --git a/src/blah b/src/blah new file mode 100644 index 0000000..0384070 --- /dev/null +++ b/src/blah @@ -0,0 +1,92 @@ + + /// hyperdrive-specific helper for discovering the public key for the "content" feed + /// from the "metadata" feed , and sending a Feed message to initialize on this connection. + pub fn init_data_feed(&mut self) -> Result<()> { + + if self.feeds.len() > 1 { + return Ok(()); + } + + let data_key = self.get_drive_data_key()?; + self.add_feed(&data_key[0..32]) + } + + + /// hyperdrive-specific helper for returning the "data" feed public key (aka, index=1) + pub fn get_drive_data_key(&mut self) -> Result<Key> { + + if self.feeds.len() > 1 { + // we already have the key + let key = self.feeds[1].clone(); + return Ok(key); + } + + // Info: downloading, not uploading + let mut im = Info::new(); + im.set_uploading(false); + im.set_downloading(true); + self.conn.send_msg(&DatNetMessage::Info(im), 0)?; + + // Have: nothing (so far) + let mut hm = Have::new(); + hm.set_start(0); + hm.set_length(0); + self.conn.send_msg(&DatNetMessage::Have(hm), 0)?; + + // UnHave: still nothing + let mut uhm = Unhave::new(); + uhm.set_start(0); + self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?; + + // Want: just the first element + let mut wm = Want::new(); + wm.set_start(0); + wm.set_length(1); + self.conn.send_msg(&DatNetMessage::Want(wm), 0)?; + + // listen for Have + loop { + let (msg, feed_index) = self.conn.recv_msg()?; + if feed_index == 1 { + continue; + } + if let DatNetMessage::Have(_) = msg { + break; + } else { + info!("Expected Have message, got: {:?}", &msg); + continue; + } + } + + // Request + let mut rm = Request::new(); + rm.set_index(0); + self.conn.send_msg(&DatNetMessage::Request(rm), 0)?; + + loop { + let (msg, feed_index) = self.conn.recv_msg()?; + if feed_index == 1 { + info!("Expected other message channel"); + continue; + } + if let DatNetMessage::Data(dm) = msg { + info!("Got metadata: {}", dm.get_index()); + if dm.get_index() == 0 { + let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?; + if index_msg.get_field_type() == "hyperdrive" { + let data_key = index_msg.get_content(); + if data_key.len() != 32 { + bail!("Received data key had wrong length: {}", data_key.len()); + } + // TODO: ok_or(), but what? + return Ok(Key::from_slice(&data_key[0..32]).unwrap()); + } else { + bail!("non-hyperdrive Index type: {}", index_msg.get_field_type()); + } + } + } else { + info!("Expected Data message, got: {:?}", &msg); + continue; + } + } + } diff --git a/src/peer.rs b/src/peer.rs index a020a1a..0ec8f96 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -28,6 +28,7 @@ pub struct PeerMsg { /// received messages into a channel back to the worker thread. fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender<Result<(DatNetMessage, u8)>>) { loop { + println!("receiver_loop tick"); match dc.recv_msg() { Ok((msg, feed_index)) => { peer_rx.send(Ok((msg, feed_index))); @@ -57,6 +58,7 @@ fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiv }); loop { + println!("worker_loop tick"); chan_select!{ outbound_chan.recv() -> val => { if let Some((msg, feed_index)) = val { diff --git a/src/sleep_register.rs b/src/sleep_register.rs index e0ea5a4..dc3c2a8 100644 --- a/src/sleep_register.rs +++ b/src/sleep_register.rs @@ -14,6 +14,7 @@ use rand::{OsRng, Rng}; use errors::*; use sleep_file::*; use make_discovery_key; +use network_msgs::Data; /// Abstract access to Hypercore register pub trait HyperRegister { @@ -33,6 +34,9 @@ pub trait HyperRegister { /// index written to. fn append(&mut self, data: &[u8]) -> Result<u64>; + /// Inserts an entry (eg, as received from the network) + fn insert(&mut self, msg: &Data) -> Result<()>; + /// Count of data entries for this register. This is the total count (highest entry index plus /// one); this particular store might be sparse. fn len(&self) -> Result<u64>; @@ -463,6 +467,68 @@ impl HyperRegister for SleepDirRegister { Ok(index) } +/* + * For a given feed block at a rev, + */ + + fn get_data_msg(&self, rev: u64) -> Result<Data> { + if not self.has(rev) { + bail!("Don't have requested data block: {}", rev); + } + + let data = self.get_data_entry(rev)?; + let root_sig = self.sign_sleep.get(rev)?; + + let nodes = vec![]; + for node_index in tree_root_nodes(rev) { + let node = Node::new(); + node.set_index(node_index); + node.set_hash(); + node.set_size(); + nodes.push(node); + } + + let msg = Data::new(); + msg.set_index(rev); + msg.set_value(&data); + msg.set_nodes(nodes); + msg.set_signature(root_sig); + Ok(msg) + } + + fn insert(&mut self, msg: &Data) -> Result<()> { + // index, value, nodes, signature + // "uncles" are included + let index = msg.get_index()?; + let data = msg.get_value()?; + let root_sig = msg.get_signature()?; + assert!(index % 2 == 0); + + // 1. Get hash of the data chunk + let leaf_hash = HyperRegister::hash_leaf(data); + + // 2. Insert tree chunks + for node in msg.get_nodes() { + let node_index = node.get_index(); + let node_hash = node.get_hash(); + let node_size = node.get_size(); + } + + // 3. Verify signature + let root_hash = HyperRegister::hash_roots(self, index)?; + ed25519::verify(&root_hash, &self.secret_key.clone().unwrap(), &root_sig); + + // 4. Insert signature + self.sign_sleep.insert(index, &root_sig)?; + + // 5. Insert bytes into data file + if let Some(ref mut df) = self.data_file { + df.seek(SeekFrom::Start(offset))?; + df.write_all(data)?; + df.sync_data()?; + } + } + fn len(&self) -> Result<u64> { // Length in entry count. let tree_len = self.tree_sleep.len()?; diff --git a/src/synchronizer.rs b/src/synchronizer.rs index cfc4cd3..a4c156b 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -125,6 +125,7 @@ impl Synchronizer { let unified_peers_rx = self.unified_peers_rx.clone(); loop { + print!("."); chan_select! { unified_peers_rx.recv() -> val => { if let Some(Ok(pm)) = val { @@ -141,7 +142,9 @@ impl Synchronizer { // mutable ref to PeerThread for this message let pt = self.peers.get_mut(&pm.peer_handle).unwrap(); - // NB: this is the simplistic model of registers (only works up to 2x per peer?) + // XXX: debug, not warn + warn!("Got a message (feed={}): {:?}", pm.feed_index, pm.msg); + if pm.feed_index as usize >= self.registers.len() { // Ignore feed channels we haven't registered yet return Ok(()); |