From 29beb39d94269d4fee50db353d5f03e55d6652b7 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 11 Apr 2022 16:32:58 -0400 Subject: [PATCH] commit changes before merge --- .nova/Configuration.json | 2 +- syncwhole/src/datastore.rs | 62 ++++++------ syncwhole/src/iblt.rs | 29 ++++-- syncwhole/src/lib.rs | 2 +- syncwhole/src/main.rs | 58 ++++------- syncwhole/src/node.rs | 195 +++++-------------------------------- syncwhole/src/protocol.rs | 91 +++-------------- 7 files changed, 112 insertions(+), 327 deletions(-) diff --git a/.nova/Configuration.json b/.nova/Configuration.json index 2d92ea5ac..725a9749e 100644 --- a/.nova/Configuration.json +++ b/.nova/Configuration.json @@ -1,3 +1,3 @@ { - "workspace.name" : "ZeroTier" + "workspace.name" : "tetanus" } diff --git a/syncwhole/src/datastore.rs b/syncwhole/src/datastore.rs index acf4714b4..aa2a31207 100644 --- a/syncwhole/src/datastore.rs +++ b/syncwhole/src/datastore.rs @@ -11,12 +11,6 @@ use async_trait::async_trait; /// Size of keys, which is the size of a 512-bit hash. This is a protocol constant. pub const KEY_SIZE: usize = 64; -/// Minimum possible value in a key range (all zero). -pub const MIN_KEY: [u8; KEY_SIZE] = [0; KEY_SIZE]; - -/// Maximum possible value in a key range (all 0xff). -pub const MAX_KEY: [u8; KEY_SIZE] = [0xff; KEY_SIZE]; - /// Result returned by DataStore::store(). pub enum StoreResult { /// Entry was accepted. @@ -32,6 +26,18 @@ pub enum StoreResult { Rejected, } +/// Convert a prefix into an inclusive range of keys. +/// +/// This is a convenience function for implementing keys_under() with data stores that support +/// straightforward range queries with full keys. +pub fn prefix_to_range(prefix: u64, prefix_bits: u32) -> ([u8; KEY_SIZE], [u8; KEY_SIZE]) { + let mut a = [0_u8; KEY_SIZE]; + a[0..8].copy_from_slice(&((prefix & 0xffffffffffffffff_u64.wrapping_shl(64 - prefix_bits)).to_be_bytes())); + let mut b = [0xff_u8; KEY_SIZE]; + b[0..8].copy_from_slice(&((prefix | 0xffffffffffffffff_u64.wrapping_shr(prefix_bits)).to_be_bytes())); + (a, b) +} + /// API to be implemented by the data set we want to replicate. /// /// Keys as understood by syncwhole are SHA512 hashes of values. The user can of course @@ -60,9 +66,6 @@ pub trait DataStore: Sync + Send { /// Maximum size of a value in bytes. const MAX_VALUE_SIZE: usize; - /// Get the current wall clock in milliseconds since Unix epoch. - fn clock(&self) -> i64; - /// Get the domain of this data store. /// /// This is an arbitrary unique identifier that must be the same for all nodes that @@ -70,15 +73,13 @@ pub trait DataStore: Sync + Send { /// data across data sets if this is not desired. fn domain(&self) -> &str; - /// Get an item if it exists as of a given reference time. - async fn load(&self, reference_time: i64, key: &[u8]) -> Option; - - /// Check whether this data store contains a key. + /// Get the reference time that should be used on this side to query remote peers. /// - /// The default implementation just calls load(). Override if a faster version is possible. - async fn contains(&self, reference_time: i64, key: &[u8]) -> bool { - self.load(reference_time, key).await.is_some() - } + /// This is typically the local "wall clock" time in milliseconds since Unix epoch. + fn reference_time(&self) -> i64; + + /// Get an item by identity hash key if it exists. + async fn load(&self, key: &[u8; KEY_SIZE]) -> Option; /// Store an item in the data store and return its status. /// @@ -103,22 +104,19 @@ pub trait DataStore: Sync + Send { /// Rejected should only be returned if the value actually fails a validity check, signature /// verification, proof of work check, or some other required criteria. Ignored must be /// returned if the value is valid but is too old or was rejected for some other normal reason. - async fn store(&self, key: &[u8], value: &[u8]) -> StoreResult; + async fn store(&self, key: &[u8; KEY_SIZE], value: &[u8]) -> StoreResult; - /// Get the number of items in a range. - async fn count(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8]) -> u64; - - /// Get the total number of records in this data store. - async fn total_count(&self) -> u64; - - /// Iterate through a series of keys in a range (inclusive), stopping when function returns false. + /// Iterate through keys under a given key prefix. /// - /// The default implementation uses for_each() and just drops the value. Specialize if you can do it faster - /// by only retrieving keys. - async fn for_each_key bool>(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], mut f: F) { - self.for_each(reference_time, key_range_start, key_range_end, |k, _| f(k)).await; - } + /// The prefix is a bit string up to 64 bits long. The implementation can technically interpret this + /// any way it wants, but usually this would be the first 64 bits of the key as a big-endian bit string. + /// + /// Keys MUST be output in ascending binary sort order. + async fn keys_under bool>(&self, reference_time: i64, prefix: u64, prefix_bits: u32, f: F); - /// Iterate through a series of entries in a range (inclusive), stopping when function returns false. - async fn for_each bool>(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], f: F); + /// Load all record values under a given key prefix. + /// + /// This should clear and fill the result, fetching up to the limit values under a given key prefix. + /// Values may be pushed into the vector in any order. + async fn values_under(&self, prefix: u64, prefix_bits: u32, result: &mut Vec>, limit: usize); } diff --git a/syncwhole/src/iblt.rs b/syncwhole/src/iblt.rs index 8c99b2d6a..eaf688e07 100644 --- a/syncwhole/src/iblt.rs +++ b/syncwhole/src/iblt.rs @@ -53,6 +53,7 @@ fn next_iteration_index(mut x: u64, hash_no: u64) -> u64 { /// The best value for HASHES seems to be 3 for an optimal fill of 80%. #[repr(C)] pub struct IBLT { + total_count: i64, // always stored little-endian in memory key: [u64; BUCKETS], check_hash: [u32; BUCKETS], count: [i8; BUCKETS], @@ -76,7 +77,7 @@ impl IBLT { pub const BUCKETS: usize = BUCKETS; /// Size of this IBLT in bytes. - pub const SIZE_BYTES: usize = BUCKETS * BUCKET_SIZE_BYTES; + pub const SIZE_BYTES: usize = 8 + (BUCKETS * BUCKET_SIZE_BYTES); // total_count + buckets /// Create a new zeroed IBLT. #[inline(always)] @@ -126,7 +127,13 @@ impl IBLT { } } - fn ins_rem(&mut self, key: u64, delta: i8) { + /// Get the total number of set items that have been added to this IBLT. + pub fn count(&self) -> u64 { + i64::from_le(self.total_count).max(0) as u64 + } + + pub(crate) fn ins_rem(&mut self, key: u64, delta: i8) { + self.total_count = i64::from_le(self.total_count).wrapping_add(delta as i64).to_le(); let check_hash = get_check_hash(key); let mut iteration_index = u64::from_le(key); for k in 0..(HASHES as u64) { @@ -152,14 +159,20 @@ impl IBLT { /// Subtract another IBLT from this one to get a set difference. pub fn subtract(&mut self, other: &Self) { + self.total_count = i64::from_le(self.total_count).wrapping_sub(i64::from_le(other.total_count.max(0))).max(0).to_le(); self.key.iter_mut().zip(other.key.iter()).for_each(|(a, b)| *a ^= *b); self.check_hash.iter_mut().zip(other.check_hash.iter()).for_each(|(a, b)| *a ^= *b); self.count.iter_mut().zip(other.count.iter()).for_each(|(a, b)| *a = a.wrapping_sub(*b)); } /// List as many entries in this IBLT as can be extracted. - /// True is returned if extraction was 100% successful. False indicates that - /// some entries were not extractable. + /// + /// True is returned if the number of extracted items was exactly equal to the total number of items + /// in this set summary. A return of false indicates incomplete extraction or an invalid IBLT. + /// + /// Due to the small check hash sizes used in this IBLT there is a very small chance this will list + /// bogus items that were never added. This is not an issue with this protocol as it would just result + /// in an unsatisfied record request. pub fn list(mut self, mut f: F) -> bool { let mut queue: Vec = Vec::with_capacity(BUCKETS); @@ -170,7 +183,10 @@ impl IBLT { } } - 'list_main: loop { + let total_count = i64::from_le(self.total_count); + let mut listed = 0; + + 'list_main: while listed < total_count { let i = queue.pop(); let i = if i.is_some() { i.unwrap() as usize @@ -182,6 +198,7 @@ impl IBLT { let check_hash = self.check_hash[i]; let count = self.count[i]; if (count == 1 || count == -1) && check_hash == get_check_hash(key) { + listed += 1; f(key); let mut iteration_index = u64::from_le(key); @@ -205,7 +222,7 @@ impl IBLT { } } - self.count.iter().all(|x| *x == 0) && self.key.iter().all(|x| *x == 0) + listed == total_count } } diff --git a/syncwhole/src/lib.rs b/syncwhole/src/lib.rs index 030bf48c3..230ff8d74 100644 --- a/syncwhole/src/lib.rs +++ b/syncwhole/src/lib.rs @@ -8,11 +8,11 @@ pub(crate) mod iblt; pub(crate) mod protocol; +pub(crate) mod utils; pub(crate) mod varint; pub mod datastore; pub mod host; pub mod node; -pub mod utils; pub use async_trait; diff --git a/syncwhole/src/main.rs b/syncwhole/src/main.rs index 1344a4744..35f3bfc4c 100644 --- a/syncwhole/src/main.rs +++ b/syncwhole/src/main.rs @@ -48,7 +48,7 @@ fn get_random_bytes(mut buf: &mut [u8]) { pub struct TestNodeHost { pub name: String, pub config: Config, - pub records: tokio::sync::Mutex>, + pub records: tokio::sync::RwLock>, } impl TestNodeHost { @@ -63,7 +63,7 @@ impl TestNodeHost { Self { name: test_no.to_string(), config: Config::default(), - records: tokio::sync::Mutex::new(s), + records: tokio::sync::RwLock::new(s), } } } @@ -105,54 +105,34 @@ impl DataStore for TestNodeHost { "test" } - async fn load(&self, _: i64, key: &[u8]) -> Option { - let key = key.try_into(); - if key.is_ok() { - let key: [u8; 64] = key.unwrap(); - let records = self.records.lock().await; - let value = records.get(&key); - if value.is_some() { - return Some(value.unwrap().clone()); - } + async fn load(&self, key: &[u8; 64]) -> Option { + let records = self.records.read().await; + let value = records.get(key); + if value.is_some() { + Some(value.unwrap().clone()) + } else { + None } - return None; } - async fn store(&self, key: &[u8], value: &[u8]) -> StoreResult { - let key = key.try_into(); - if key.is_ok() && value.len() == 64 { - let key: [u8; 64] = key.unwrap(); - let value: [u8; 64] = value.try_into().unwrap(); - if key == Self::sha512(&[&value]) { - if self.records.lock().await.insert(key, value).is_none() { - StoreResult::Ok - } else { - StoreResult::Duplicate - } + async fn store(&self, key: &[u8; 64], value: &[u8]) -> StoreResult { + let value: [u8; 64] = value.try_into(); + if value.is_ok() { + if self.records.write().await.insert(key.clone(), value).is_none() { + StoreResult::Ok } else { - StoreResult::Rejected + StoreResult::Duplicate } } else { StoreResult::Rejected } } - async fn count(&self, _: i64, key_range_start: &[u8], key_range_end: &[u8]) -> u64 { - let start: [u8; 64] = key_range_start.try_into().unwrap(); - let end: [u8; 64] = key_range_end.try_into().unwrap(); - self.records.lock().await.range((Included(start), Included(end))).count() as u64 - } - - async fn total_count(&self) -> u64 { - self.records.lock().await.len() as u64 - } - - async fn for_each bool>(&self, _reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], mut f: F) { - let start: [u8; 64] = key_range_start.try_into().unwrap(); - let end: [u8; 64] = key_range_end.try_into().unwrap(); - let records = self.records.lock().await; + async fn keys_under bool>(&self, reference_time: i64, prefix: u64, prefix_bits: u32, f: F) { + let (start, end) = prefix_to_range(prefix, prefix_bits); + let records = self.records.read().await; for (k, v) in records.range((Included(start), Included(end))) { - if !f(k, v) { + if !f(k) { break; } } diff --git a/syncwhole/src/node.rs b/syncwhole/src/node.rs index 32f421b1d..d63424746 100644 --- a/syncwhole/src/node.rs +++ b/syncwhole/src/node.rs @@ -35,10 +35,6 @@ const HOUSEKEEPING_PERIOD: i64 = SYNC_STATUS_PERIOD; /// Inactivity timeout for connections in milliseconds. const CONNECTION_TIMEOUT: i64 = SYNC_STATUS_PERIOD * 4; -/// Announce when we get records from peers if sync status estimate is more than this threshold. -/// This is used to stop us from spamming with HaveRecords while catching up. -const ANNOUNCE_IF_SYNCED_MORE_THAN: f64 = 0.95; - /// Information about a remote node to which we are connected. #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RemoteNodeInfo { @@ -97,7 +93,6 @@ impl Node { announce_queue: Mutex::new(HashMap::with_capacity(256)), bind_address, starting_instant: Instant::now(), - sync_completeness_estimate: AtomicU64::new((0.0_f64).to_bits()), }); Ok(Self { @@ -118,13 +113,6 @@ impl Node { &self.internal.host } - /// Broadcast a new record to the world. - /// - /// This should be called when new records are added to the synchronized data store - /// that are created locally. If this isn't called it may take a while for normal - /// sync to pick up and propagate the record. - pub async fn broadcast_new_record(&self, key: &[u8], value: &[u8]) {} - /// Attempt to connect to an explicitly specified TCP endpoint. pub async fn connect(&self, endpoint: &SocketAddr) -> std::io::Result { self.internal.clone().connect(endpoint, Instant::now().add(Duration::from_millis(CONNECTION_TIMEOUT as u64))).await @@ -144,15 +132,6 @@ impl Node { pub async fn connection_count(&self) -> usize { self.internal.connections.lock().await.len() } - - /// Get a value from 0.0 to 1.0 estimating how synchronized we are with the network. - /// - /// This is an inexact estimate since it's based on record counts and it's possible for - /// two nodes to have the same count but disjoint sets. It tends to be fairly good in - /// practice though unless you have been disconnected for a very long time. - pub async fn sync_completeness_estimate(&self) -> f64 { - f64::from_bits(self.internal.sync_completeness_estimate.load(Ordering::Relaxed)) - } } impl Drop for Node { @@ -197,9 +176,6 @@ pub struct NodeInternal { // Instant this node started. starting_instant: Instant, - - // Latest estimate of sync completeness. - sync_completeness_estimate: AtomicU64, } impl NodeInternal { @@ -221,26 +197,11 @@ impl NodeInternal { connected_to_addresses.clear(); let now = self.ms_monotonic(); - // Drop dead connections, send SyncStatus, and populate counts for computing sync status. - let sync_status = Arc::new( - rmp_serde::encode::to_vec_named(&msg::SyncStatus { - record_count: self.datastore.total_count().await, - clock: self.datastore.clock() as u64, - }) - .unwrap(), - ); self.connections.lock().await.retain(|sa, c| { if !c.closed.load(Ordering::Relaxed) { let cc = c.clone(); if (now - c.last_receive_time.load(Ordering::Relaxed)) < CONNECTION_TIMEOUT { connected_to_addresses.insert(sa.clone()); - if c.info.lock().unwrap().initialized { - counts.push(c.last_sync_status_record_count.load(Ordering::Relaxed)); - let ss2 = sync_status.clone(); - tasks.push(tokio::spawn(async move { - let _ = tokio::time::timeout_at(sleep_until, cc.send_msg(MessageType::SyncStatus, ss2.as_slice(), now)).await; - })); - } true // keep connection } else { let _ = c.read_task.lock().unwrap().take().map(|j| j.abort()); @@ -271,19 +232,6 @@ impl NodeInternal { } }); - let sync_completness_estimate = if !counts.is_empty() { - counts.sort_unstable(); - let twothirds = if counts.len() > 3 { *counts.get((counts.len() / 3) * 2).unwrap() } else { *counts.last().unwrap() }; - if twothirds > 0 { - ((self.datastore.total_count().await as f64) / (twothirds as f64)).min(1.0) - } else { - 1.0 - } - } else { - 1.0 - }; - self.sync_completeness_estimate.store(sync_completness_estimate.to_bits(), Ordering::Relaxed); - let config = self.host.node_config(); // Always try to connect to anchor peers. @@ -341,20 +289,22 @@ impl NodeInternal { let now = self.ms_monotonic(); for c in self.connections.lock().await.iter() { - let mut have_records: Vec = Vec::with_capacity((to_announce.len() * ANNOUNCE_KEY_LEN) + 4); - have_records.push(ANNOUNCE_KEY_LEN as u8); - for (key, already_has) in to_announce.iter() { - if !already_has.contains(c.0) { - let _ = std::io::Write::write_all(&mut have_records, key); + if c.1.announce_new_records.load(Ordering::Relaxed) { + let mut have_records: Vec = Vec::with_capacity((to_announce.len() * ANNOUNCE_KEY_LEN) + 4); + have_records.push(ANNOUNCE_KEY_LEN as u8); + for (key, already_has) in to_announce.iter() { + if !already_has.contains(c.0) { + let _ = std::io::Write::write_all(&mut have_records, key); + } + } + if have_records.len() > 1 { + let c2 = c.1.clone(); + background_tasks.spawn(async move { + // If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to + // write buffering but a short timeout prevents them from building up too much. + let _ = tokio::time::timeout(announce_timeout, c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now)); + }) } - } - if have_records.len() > 1 { - let c2 = c.1.clone(); - background_tasks.spawn(async move { - // If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to - // write buffering but a short timeout prevents them from building up too much. - let _ = tokio::time::timeout(announce_timeout, c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now)); - }) } } @@ -401,7 +351,6 @@ impl NodeInternal { writer: Mutex::new(writer), last_send_time: AtomicI64::new(now), last_receive_time: AtomicI64::new(now), - last_sync_status_record_count: AtomicU64::new(0), info: std::sync::Mutex::new(RemoteNodeInfo { name: String::new(), contact: String::new(), @@ -413,6 +362,7 @@ impl NodeInternal { initialized: false, }), read_task: std::sync::Mutex::new(None), + announce_new_records: AtomicBool::new(false), closed: AtomicBool::new(false), }); let self2 = self.clone(); @@ -431,7 +381,6 @@ impl NodeInternal { const BUF_CHUNK_SIZE: usize = 4096; const READ_BUF_INITIAL_SIZE: usize = 65536; // should be a multiple of BUF_CHUNK_SIZE - let background_tasks = AsyncTaskReaper::new(); let mut write_buffer: Vec = Vec::with_capacity(BUF_CHUNK_SIZE); let mut read_buffer: Vec = Vec::new(); read_buffer.resize(READ_BUF_INITIAL_SIZE, 0); @@ -581,102 +530,19 @@ impl NodeInternal { } match message_type { - MessageType::HaveRecords => { - if message.len() > 1 { - let clock = self.datastore.clock(); - let mut announce_queue_key = [0_u8; ANNOUNCE_KEY_LEN]; - let mut start = [0_u8; KEY_SIZE]; - let mut end = [0xff_u8; KEY_SIZE]; - let key_prefix_len = message[0] as usize; - message = &message[1..]; - if key_prefix_len > 0 && key_prefix_len <= KEY_SIZE { - write_buffer.clear(); - write_buffer.push(key_prefix_len as u8); - while message.len() >= key_prefix_len { - let key_prefix = &message[..key_prefix_len]; + MessageType::HaveRecords => {} - if key_prefix_len >= ANNOUNCE_KEY_LEN { - // If the key prefix is appropriately sized, look up and add this remote endpoint - // to the list of endpoints that already have this record if it's in the announce - // queue. We don't add a new entry to the announce queue if one doesn't already - // exist because we did not just receive the actual record. This just avoids announcing - // to peers that just told us they have it. - announce_queue_key.copy_from_slice(&key_prefix[..ANNOUNCE_KEY_LEN]); - self.announce_queue.lock().await.get_mut(&announce_queue_key).map(|already_has| { - if !already_has.contains(&remote_address) { - already_has.push(remote_address.clone()); - } - }); - } - - if if key_prefix_len < KEY_SIZE { - (&mut start[..key_prefix_len]).copy_from_slice(key_prefix); - (&mut end[..key_prefix_len]).copy_from_slice(key_prefix); - self.datastore.count(clock, &start, &end).await == 0 - } else { - !self.datastore.contains(clock, key_prefix).await - } { - let _ = std::io::Write::write_all(&mut write_buffer, key_prefix); - } - - message = &message[key_prefix_len..]; - } - if write_buffer.len() > 1 { - let _ = connection.send_msg(MessageType::GetRecords, write_buffer.as_slice(), now).await?; - } - } - } - } - - MessageType::GetRecords => { - if message.len() > 1 { - let mut start = [0_u8; KEY_SIZE]; - let mut end = [0xff_u8; KEY_SIZE]; - let key_prefix_len = message[0] as usize; - message = &message[1..]; - if key_prefix_len > 0 && key_prefix_len <= KEY_SIZE { - while message.len() >= key_prefix_len { - let key_prefix = &message[..key_prefix_len]; - - if key_prefix_len < KEY_SIZE { - (&mut start[..key_prefix_len]).copy_from_slice(key_prefix); - (&mut end[..key_prefix_len]).copy_from_slice(key_prefix); - self.datastore - .for_each(0, &start, &end, |_, v| { - let v2 = v.clone(); - let c2 = connection.clone(); - background_tasks.spawn(async move { - let _ = c2.send_msg(MessageType::Record, v2.as_ref(), now).await; - }); - true - }) - .await; - } else { - let record = self.datastore.load(0, key_prefix).await; - if record.is_some() { - let record = record.unwrap(); - let v: &[u8] = record.as_ref(); - let _ = connection.send_msg(MessageType::Record, v, now).await?; - } - } - - message = &message[key_prefix_len..]; - } - } - } - } + MessageType::GetRecords => {} MessageType::Record => { let key = H::sha512(&[message]); match self.datastore.store(&key, message).await { StoreResult::Ok => { - if f64::from_bits(self.sync_completeness_estimate.load(Ordering::Relaxed)) >= ANNOUNCE_IF_SYNCED_MORE_THAN { - let announce_key: [u8; ANNOUNCE_KEY_LEN] = (&key[..ANNOUNCE_KEY_LEN]).try_into().unwrap(); - let mut q = self.announce_queue.lock().await; - let ql = q.entry(announce_key).or_insert_with(|| Vec::with_capacity(2)); - if !ql.contains(&remote_address) { - ql.push(remote_address.clone()); - } + let announce_key: [u8; ANNOUNCE_KEY_LEN] = (&key[..ANNOUNCE_KEY_LEN]).try_into().unwrap(); + let mut q = self.announce_queue.lock().await; + let ql = q.entry(announce_key).or_insert_with(|| Vec::with_capacity(2)); + if !ql.contains(&remote_address) { + ql.push(remote_address.clone()); } } StoreResult::Rejected => { @@ -686,17 +552,8 @@ impl NodeInternal { } } - MessageType::SyncStatus => { - let msg: msg::SyncStatus = decode_msgpack(message)?; - connection.last_sync_status_record_count.store(msg.record_count, Ordering::Relaxed); - } - - MessageType::SyncRequest => { - let msg: msg::SyncRequest = decode_msgpack(message)?; - } - - MessageType::SyncResponse => { - let msg: msg::SyncResponse = decode_msgpack(message)?; + MessageType::Sync => { + let msg: msg::Sync = decode_msgpack(message)?; } _ => {} @@ -733,9 +590,9 @@ struct Connection { writer: Mutex, last_send_time: AtomicI64, last_receive_time: AtomicI64, - last_sync_status_record_count: AtomicU64, info: std::sync::Mutex, read_task: std::sync::Mutex>>>, + announce_new_records: AtomicBool, closed: AtomicBool, } diff --git a/syncwhole/src/protocol.rs b/syncwhole/src/protocol.rs index 7b65ff4d9..2882f9a04 100644 --- a/syncwhole/src/protocol.rs +++ b/syncwhole/src/protocol.rs @@ -27,7 +27,7 @@ pub enum MessageType { /// msg::InitResponse (msgpack) InitResponse = 2_u8, - /// [...] + /// [...] HaveRecords = 3_u8, /// [...] @@ -36,14 +36,8 @@ pub enum MessageType { /// Record = 5_u8, - /// msg::SyncStatus (msgpack) - SyncStatus = 6_u8, - - /// msg::SyncRequest (msgpack) - SyncRequest = 7_u8, - - /// msg::SyncResponse (msgpack) - SyncResponse = 8_u8, + /// msg::Sync (msgpack) + Sync = 7_u8, } impl From for MessageType { @@ -68,9 +62,7 @@ impl MessageType { Self::HaveRecords => "HAVE_RECORDS", Self::GetRecords => "GET_RECORDS", Self::Record => "RECORD", - Self::SyncStatus => "SYNC_STATUS", - Self::SyncRequest => "SYNC_REQUEST", - Self::SyncResponse => "SYNC_RESPONSE", + Self::Sync => "SYNC", } } } @@ -141,81 +133,22 @@ pub mod msg { } #[derive(Serialize, Deserialize)] - pub struct SyncStatus { - /// Total number of records this node has in its data store. - #[serde(rename = "c")] - pub record_count: u64, + pub struct Sync<'a> { + /// 64-bit prefix of reocrd keys for this request + #[serde(rename = "p")] + pub prefix: u64, - /// Sending node's system clock. - #[serde(rename = "t")] - pub clock: u64, - } - - #[derive(Serialize, Deserialize)] - pub struct SyncRequest<'a> { - /// Key range start (length: KEY_SIZE) - #[serde(with = "serde_bytes")] - #[serde(rename = "s")] - pub range_start: &'a [u8], - - /// Key range end (length: KEY_SIZE) - #[serde(with = "serde_bytes")] - #[serde(rename = "e")] - pub range_end: &'a [u8], - - /// Number of records requesting node already has under key range - #[serde(rename = "c")] - pub record_count: u64, + /// Number of bits in prefix that are meaningful + #[serde(rename = "b")] + pub prefix_bits: u8, /// Reference time for query #[serde(rename = "t")] pub reference_time: u64, - /// Random salt - #[serde(rename = "x")] - pub salt: &'a [u8], - } - - #[derive(Serialize, Deserialize)] - pub struct SyncResponse<'a> { - /// Key range start (length: KEY_SIZE) - #[serde(rename = "s")] - pub range_start: &'a [u8], - - /// Key range end (length: KEY_SIZE) - #[serde(rename = "e")] - pub range_end: &'a [u8], - - /// Number of records responder has under key range - #[serde(rename = "c")] - pub record_count: u64, - - /// Reference time for query - #[serde(rename = "t")] - pub reference_time: u64, - - /// Random salt - #[serde(rename = "x")] - pub salt: &'a [u8], - - /// IBLT set summary or empty if not included - /// - /// If an IBLT is omitted it means the sender determined it was - /// more efficient to just send keys. In that case keys[] should have - /// an explicit list. + /// Set summary for keys under prefix #[serde(with = "serde_bytes")] #[serde(rename = "i")] pub iblt: &'a [u8], - - /// Explicit list of keys (full key length). - /// - /// This may still contain keys if an IBLT is present. In that case - /// keys included here will be any that have identical 64-bit prefixes - /// to keys already added to the IBLT and thus would collide. These - /// should be rare so it's most efficient to just explicitly name them. - /// Otherwise keys with identical 64-bit prefixes may never be synced. - #[serde(with = "serde_bytes")] - #[serde(rename = "k")] - pub keys: &'a [u8], } }