1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
use errors::*;
use network_msgs::*;
use bitfield::*;
use protocol::{DatNetMessage, DatConnection};
use sleep_register::HyperRegister;
// Synchronizer
// register_keys
// peers: vec
// registers: HyperRegisters
// mode: enum
// state: enum
// wanted: bitfield
// requested: vec
//
// fn next_wanted() -> Option((reg, u64))
// fn tick()
fn max_index(have_msg: &Have) -> Result<u64> {
if have_msg.has_length() {
return Ok(have_msg.get_start() + have_msg.get_length());
} else if have_msg.has_bitfield() {
let raw_bf = have_msg.get_bitfield();
let bf = decode_bitfield(raw_bf)?;
trace!("decoded bitfield: {:?}", bf);
return Ok(max_high_bit(&bf));
} else {
return Ok(have_msg.get_start() + 1);
}
}
#[test]
fn test_max_index() {
let mut hm = Have::new();
hm.set_start(0);
hm.set_bitfield(vec![7,2,128]);
assert_eq!(max_index(&hm).unwrap(), 8);
hm.set_bitfield(vec![2, 207]);
assert_eq!(max_index(&hm).unwrap(), 7);
// Alphabet test dat
hm.set_bitfield(vec![2, 254]);
assert_eq!(max_index(&hm).unwrap(), 6);
hm.set_bitfield(vec![2, 252]);
assert_eq!(max_index(&hm).unwrap(), 5);
}
/// Tries to connect to a single peer, pull register, and close.
pub fn node_simple_clone(host_port: &str, key: &[u8], register: &mut HyperRegister, is_content: bool) -> Result<()> {
if register.len()? > 0 {
bail!("Register already had data in it (expected empty for naive clone)");
}
let mut dc = DatConnection::connect(host_port, key, false)?;
// Info: downloading, not uploading
let mut im = Info::new();
im.set_uploading(false);
im.set_downloading(true);
let im = DatNetMessage::Info(im);
dc.send_msg(&im, is_content)?;
// Have: nothing (so far)
let mut hm = Have::new();
hm.set_start(0);
hm.set_length(0);
let hm = DatNetMessage::Have(hm);
dc.send_msg(&hm, is_content)?;
// UnHave: still nothing
let mut uhm = Unhave::new();
uhm.set_start(0);
let uhm = DatNetMessage::Unhave(uhm);
dc.send_msg(&uhm, is_content)?;
// Want: everything
let mut wm = Want::new();
wm.set_start(0);
let wm = DatNetMessage::Want(wm);
dc.send_msg(&wm, is_content)?;
let last_entry: u64;
// Receive Have messages to determine lengths
loop {
let (was_content, msg) = dc.recv_msg()?;
match msg {
DatNetMessage::Have(dh) => {
info!("is_content={}; {:?}; bitfield={:?}", was_content, dh, dh.get_bitfield());
last_entry = max_index(&dh)?;
break;
},
_ => {
info!("Other message: {:?}", &msg);
}
}
}
info!("last_entry={}", last_entry);
// Request / Data loops
for i in 0..(last_entry+1) {
let mut rm = Request::new();
rm.set_index(i);
info!("Sending request: {:?}", rm);
dc.send_msg(&DatNetMessage::Request(rm), false)?;
let (was_content, msg) = dc.recv_msg()?;
assert!(!was_content);
match msg {
DatNetMessage::Data(dm) => {
info!("Got data: index={}", dm.get_index());
assert!(dm.has_value());
assert!(dm.get_index() == i);
register.append(dm.get_value())?;
},
_ => {
info!("Other message: {:?}", &msg);
}
}
}
Ok(())
}
|