aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-07-31 22:45:14 -0700
committerBryan Newbold <bnewbold@robocracy.org>2018-07-31 22:46:27 -0700
commit7662f08f2ec0f0179d42a8f79bc5eeed70fd2dd8 (patch)
treeeec47c826555a822d67afc90be9c8393d0162917
parent7164f10355fcdc45d3be58bfcee2939937e01623 (diff)
downloadgeniza-7662f08f2ec0f0179d42a8f79bc5eeed70fd2dd8.tar.gz
geniza-7662f08f2ec0f0179d42a8f79bc5eeed70fd2dd8.zip
commit old WIPold-wip
-rw-r--r--src/blah92
-rw-r--r--src/peer.rs2
-rw-r--r--src/sleep_register.rs66
-rw-r--r--src/synchronizer.rs5
4 files changed, 164 insertions, 1 deletions
diff --git a/src/blah b/src/blah
new file mode 100644
index 0000000..0384070
--- /dev/null
+++ b/src/blah
@@ -0,0 +1,92 @@
+
+ /// hyperdrive-specific helper for discovering the public key for the "content" feed
+ /// from the "metadata" feed , and sending a Feed message to initialize on this connection.
+ pub fn init_data_feed(&mut self) -> Result<()> {
+
+ if self.feeds.len() > 1 {
+ return Ok(());
+ }
+
+ let data_key = self.get_drive_data_key()?;
+ self.add_feed(&data_key[0..32])
+ }
+
+
+ /// hyperdrive-specific helper for returning the "data" feed public key (aka, index=1)
+ pub fn get_drive_data_key(&mut self) -> Result<Key> {
+
+ if self.feeds.len() > 1 {
+ // we already have the key
+ let key = self.feeds[1].clone();
+ return Ok(key);
+ }
+
+ // Info: downloading, not uploading
+ let mut im = Info::new();
+ im.set_uploading(false);
+ im.set_downloading(true);
+ self.conn.send_msg(&DatNetMessage::Info(im), 0)?;
+
+ // Have: nothing (so far)
+ let mut hm = Have::new();
+ hm.set_start(0);
+ hm.set_length(0);
+ self.conn.send_msg(&DatNetMessage::Have(hm), 0)?;
+
+ // UnHave: still nothing
+ let mut uhm = Unhave::new();
+ uhm.set_start(0);
+ self.conn.send_msg(&DatNetMessage::Unhave(uhm), 0)?;
+
+ // Want: just the first element
+ let mut wm = Want::new();
+ wm.set_start(0);
+ wm.set_length(1);
+ self.conn.send_msg(&DatNetMessage::Want(wm), 0)?;
+
+ // listen for Have
+ loop {
+ let (msg, feed_index) = self.conn.recv_msg()?;
+ if feed_index == 1 {
+ continue;
+ }
+ if let DatNetMessage::Have(_) = msg {
+ break;
+ } else {
+ info!("Expected Have message, got: {:?}", &msg);
+ continue;
+ }
+ }
+
+ // Request
+ let mut rm = Request::new();
+ rm.set_index(0);
+ self.conn.send_msg(&DatNetMessage::Request(rm), 0)?;
+
+ loop {
+ let (msg, feed_index) = self.conn.recv_msg()?;
+ if feed_index == 1 {
+ info!("Expected other message channel");
+ continue;
+ }
+ if let DatNetMessage::Data(dm) = msg {
+ info!("Got metadata: {}", dm.get_index());
+ if dm.get_index() == 0 {
+ let index_msg = parse_from_bytes::<Index>(&mut dm.get_value())?;
+ if index_msg.get_field_type() == "hyperdrive" {
+ let data_key = index_msg.get_content();
+ if data_key.len() != 32 {
+ bail!("Received data key had wrong length: {}", data_key.len());
+ }
+ // TODO: ok_or(), but what?
+ return Ok(Key::from_slice(&data_key[0..32]).unwrap());
+ } else {
+ bail!("non-hyperdrive Index type: {}", index_msg.get_field_type());
+ }
+ }
+ } else {
+ info!("Expected Data message, got: {:?}", &msg);
+ continue;
+ }
+ }
+ }
diff --git a/src/peer.rs b/src/peer.rs
index a020a1a..0ec8f96 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -28,6 +28,7 @@ pub struct PeerMsg {
/// received messages into a channel back to the worker thread.
fn receiver_loop(mut dc: DatConnection, peer_rx: chan::Sender<Result<(DatNetMessage, u8)>>) {
loop {
+ println!("receiver_loop tick");
match dc.recv_msg() {
Ok((msg, feed_index)) => {
peer_rx.send(Ok((msg, feed_index)));
@@ -57,6 +58,7 @@ fn worker_thread(mut dc: DatConnection, handle: u64, outbound_chan: chan::Receiv
});
loop {
+ println!("worker_loop tick");
chan_select!{
outbound_chan.recv() -> val => {
if let Some((msg, feed_index)) = val {
diff --git a/src/sleep_register.rs b/src/sleep_register.rs
index e0ea5a4..dc3c2a8 100644
--- a/src/sleep_register.rs
+++ b/src/sleep_register.rs
@@ -14,6 +14,7 @@ use rand::{OsRng, Rng};
use errors::*;
use sleep_file::*;
use make_discovery_key;
+use network_msgs::Data;
/// Abstract access to Hypercore register
pub trait HyperRegister {
@@ -33,6 +34,9 @@ pub trait HyperRegister {
/// index written to.
fn append(&mut self, data: &[u8]) -> Result<u64>;
+ /// Inserts an entry (eg, as received from the network)
+ fn insert(&mut self, msg: &Data) -> Result<()>;
+
/// Count of data entries for this register. This is the total count (highest entry index plus
/// one); this particular store might be sparse.
fn len(&self) -> Result<u64>;
@@ -463,6 +467,68 @@ impl HyperRegister for SleepDirRegister {
Ok(index)
}
+/*
+ * For a given feed block at a rev,
+ */
+
+ fn get_data_msg(&self, rev: u64) -> Result<Data> {
+ if not self.has(rev) {
+ bail!("Don't have requested data block: {}", rev);
+ }
+
+ let data = self.get_data_entry(rev)?;
+ let root_sig = self.sign_sleep.get(rev)?;
+
+ let nodes = vec![];
+ for node_index in tree_root_nodes(rev) {
+ let node = Node::new();
+ node.set_index(node_index);
+ node.set_hash();
+ node.set_size();
+ nodes.push(node);
+ }
+
+ let msg = Data::new();
+ msg.set_index(rev);
+ msg.set_value(&data);
+ msg.set_nodes(nodes);
+ msg.set_signature(root_sig);
+ Ok(msg)
+ }
+
+ fn insert(&mut self, msg: &Data) -> Result<()> {
+ // index, value, nodes, signature
+ // "uncles" are included
+ let index = msg.get_index()?;
+ let data = msg.get_value()?;
+ let root_sig = msg.get_signature()?;
+ assert!(index % 2 == 0);
+
+ // 1. Get hash of the data chunk
+ let leaf_hash = HyperRegister::hash_leaf(data);
+
+ // 2. Insert tree chunks
+ for node in msg.get_nodes() {
+ let node_index = node.get_index();
+ let node_hash = node.get_hash();
+ let node_size = node.get_size();
+ }
+
+ // 3. Verify signature
+ let root_hash = HyperRegister::hash_roots(self, index)?;
+ ed25519::verify(&root_hash, &self.secret_key.clone().unwrap(), &root_sig);
+
+ // 4. Insert signature
+ self.sign_sleep.insert(index, &root_sig)?;
+
+ // 5. Insert bytes into data file
+ if let Some(ref mut df) = self.data_file {
+ df.seek(SeekFrom::Start(offset))?;
+ df.write_all(data)?;
+ df.sync_data()?;
+ }
+ }
+
fn len(&self) -> Result<u64> {
// Length in entry count.
let tree_len = self.tree_sleep.len()?;
diff --git a/src/synchronizer.rs b/src/synchronizer.rs
index cfc4cd3..a4c156b 100644
--- a/src/synchronizer.rs
+++ b/src/synchronizer.rs
@@ -125,6 +125,7 @@ impl Synchronizer {
let unified_peers_rx = self.unified_peers_rx.clone();
loop {
+ print!(".");
chan_select! {
unified_peers_rx.recv() -> val => {
if let Some(Ok(pm)) = val {
@@ -141,7 +142,9 @@ impl Synchronizer {
// mutable ref to PeerThread for this message
let pt = self.peers.get_mut(&pm.peer_handle).unwrap();
- // NB: this is the simplistic model of registers (only works up to 2x per peer?)
+ // XXX: debug, not warn
+ warn!("Got a message (feed={}): {:?}", pm.feed_index, pm.msg);
+
if pm.feed_index as usize >= self.registers.len() {
// Ignore feed channels we haven't registered yet
return Ok(());