From 1478053e433b42514b7e970ec1485f65c3b39edd Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 19 Apr 2022 09:57:47 -0400 Subject: [PATCH] sync work in progress --- syncwhole/src/datastore.rs | 57 +++---------- syncwhole/src/host.rs | 31 +++++-- syncwhole/src/node.rs | 170 ++++++++++++++++++++----------------- syncwhole/src/protocol.rs | 61 +++++++------ syncwhole/src/utils.rs | 2 + 5 files changed, 169 insertions(+), 152 deletions(-) diff --git a/syncwhole/src/datastore.rs b/syncwhole/src/datastore.rs index aa2a31207..6326470f1 100644 --- a/syncwhole/src/datastore.rs +++ b/syncwhole/src/datastore.rs @@ -26,32 +26,16 @@ pub enum StoreResult { Rejected, } -/// Convert a prefix into an inclusive range of keys. -/// -/// This is a convenience function for implementing keys_under() with data stores that support -/// straightforward range queries with full keys. -pub fn prefix_to_range(prefix: u64, prefix_bits: u32) -> ([u8; KEY_SIZE], [u8; KEY_SIZE]) { - let mut a = [0_u8; KEY_SIZE]; - a[0..8].copy_from_slice(&((prefix & 0xffffffffffffffff_u64.wrapping_shl(64 - prefix_bits)).to_be_bytes())); - let mut b = [0xff_u8; KEY_SIZE]; - b[0..8].copy_from_slice(&((prefix | 0xffffffffffffffff_u64.wrapping_shr(prefix_bits)).to_be_bytes())); - (a, b) -} - /// API to be implemented by the data set we want to replicate. /// -/// Keys as understood by syncwhole are SHA512 hashes of values. The user can of course -/// have their own concept of a "key" separate from this, but that would not be used -/// for data set replication. Replication is content identity based. +/// Keys as used in this API are SHA512 hashes of values. /// -/// The API specified here supports temporally subjective data sets. These are data sets -/// where the existence or non-existence of a record may depend on the (real world) time. -/// A parameter for reference time allows a remote querying node to send its own "this is -/// what time I think it is" value to be considered locally so that data can be replicated -/// as of any given time. -/// -/// In any call with a reference_time it should be ignored if it's zero. A zero reference -/// time should mean include all data that we have. +/// Range queries take an optional subset parameter. The format and interpretation of +/// this is entirely up to the implementer of DataStore. It could contain a time, a SQL +/// query, a set of certificates, anything. Its purpose is to select which items we want +/// from remote nodes so that we can replicate only a subset of a larger set of data. +/// Other nodes can also supply a subset to this one, so it's important that remote subset +/// values supplied to the local data store be handled correctly. #[async_trait] pub trait DataStore: Sync + Send { /// Container for values returned by load(). @@ -66,17 +50,8 @@ pub trait DataStore: Sync + Send { /// Maximum size of a value in bytes. const MAX_VALUE_SIZE: usize; - /// Get the domain of this data store. - /// - /// This is an arbitrary unique identifier that must be the same for all nodes that - /// are replicating the same data. It's checked on connect to avoid trying to share - /// data across data sets if this is not desired. - fn domain(&self) -> &str; - - /// Get the reference time that should be used on this side to query remote peers. - /// - /// This is typically the local "wall clock" time in milliseconds since Unix epoch. - fn reference_time(&self) -> i64; + /// Get the subset that should be sent to remote nodes in queries. + async fn local_subset(&self) -> Option; /// Get an item by identity hash key if it exists. async fn load(&self, key: &[u8; KEY_SIZE]) -> Option; @@ -106,17 +81,13 @@ pub trait DataStore: Sync + Send { /// returned if the value is valid but is too old or was rejected for some other normal reason. async fn store(&self, key: &[u8; KEY_SIZE], value: &[u8]) -> StoreResult; - /// Iterate through keys under a given key prefix. - /// - /// The prefix is a bit string up to 64 bits long. The implementation can technically interpret this - /// any way it wants, but usually this would be the first 64 bits of the key as a big-endian bit string. + /// Iterate through keys in a range. /// /// Keys MUST be output in ascending binary sort order. - async fn keys_under bool>(&self, reference_time: i64, prefix: u64, prefix_bits: u32, f: F); + async fn keys bool>(&self, subset: Option<&[u8]>, range_start: &[u8; KEY_SIZE], range_end: &[u8; KEY_SIZE], f: F); - /// Load all record values under a given key prefix. + /// Iterate through values in a range. /// - /// This should clear and fill the result, fetching up to the limit values under a given key prefix. - /// Values may be pushed into the vector in any order. - async fn values_under(&self, prefix: u64, prefix_bits: u32, result: &mut Vec>, limit: usize); + /// Entries MUST be output in ascending binary sort order. + async fn values bool>(&self, subset: Option<&[u8]>, range_start: &[u8; KEY_SIZE], range_end: &[u8; KEY_SIZE], f: F); } diff --git a/syncwhole/src/host.rs b/syncwhole/src/host.rs index b1b268e96..713f026ff 100644 --- a/syncwhole/src/host.rs +++ b/syncwhole/src/host.rs @@ -7,6 +7,7 @@ */ use std::net::SocketAddr; +use std::sync::Arc; #[cfg(feature = "include_sha2_lib")] use sha2::digest::{Digest, FixedOutput}; @@ -22,8 +23,9 @@ pub struct Config { /// The library will try to maintain connectivity to these regardless of connection limits. pub anchors: Vec, - /// A list of peer addresses that we can try in order to achieve desired_connection_count. - pub seeds: Vec, + /// A list of other peer addresses that we can try in order to achieve desired_connection_count. + /// If this includes the anchors too there will be no effect since the anchors are tried first anyway. + pub peers: Vec, /// The maximum number of TCP connections we should allow. pub max_connection_count: usize, @@ -31,6 +33,15 @@ pub struct Config { /// The desired number of peering links. pub desired_connection_count: usize, + /// Synchronization interval in milliseconds. + pub sync_interval: u64, + + /// Connection inactivity timeout in milliseconds. + pub connection_timeout: u64, + + /// An arbitrary name for this data set to avoid connecting to irrelevant nodes. + pub domain: String, + /// An optional name for this node to advertise to other nodes. pub name: String, @@ -43,9 +54,12 @@ impl Default for Config { fn default() -> Self { Self { anchors: Vec::new(), - seeds: Vec::new(), + peers: Vec::new(), max_connection_count: 128, desired_connection_count: 64, + sync_interval: 500, + connection_timeout: 500 * 10, + domain: String::new(), name: String::new(), contact: String::new(), } @@ -54,14 +68,16 @@ impl Default for Config { /// A trait that users of syncwhole implement to provide configuration information and listen for events. pub trait Host: Sync + Send { - /// Get a copy of the current configuration for this syncwhole node. - fn node_config(&self) -> Config; + /// Get the current configuration for this node. + fn node_config(&self) -> Arc; /// Test whether an inbound connection should be allowed from an address. /// /// This is called on first incoming connection before any init is received. The authenticate() /// method is called once init has been received and is another decision point. The default /// implementation of this always returns true. + /// + /// This is not called for outbound connections. #[allow(unused_variables)] fn allow(&self, remote_address: &SocketAddr) -> bool { true @@ -98,7 +114,7 @@ pub trait Host: Sync + Send { /// Fill a buffer with secure random bytes. /// - /// This is supplied to reduce inherent dependencies and allow the user to choose the implementation. + /// The implementer must call a secure random number generator or source to implement this. fn get_secure_random(&self, buf: &mut [u8]); /// Compute a SHA512 digest of the input. @@ -106,7 +122,8 @@ pub trait Host: Sync + Send { /// Input can consist of one or more slices that will be processed in order. /// /// If the feature "include_sha2_lib" is enabled a default implementation in terms of the - /// Rust sha2 crate is generated. Otherwise the user must supply their own implementation. + /// Rust sha2 crate is generated. Otherwise the implementer must supply their own + /// SHA512 function. #[cfg(not(feature = "include_sha2_lib"))] fn sha512(msg: &[&[u8]]) -> [u8; 64]; #[cfg(feature = "include_sha2_lib")] diff --git a/syncwhole/src/node.rs b/syncwhole/src/node.rs index 20ef1cd14..1cd488c18 100644 --- a/syncwhole/src/node.rs +++ b/syncwhole/src/node.rs @@ -11,10 +11,13 @@ use std::io::IoSlice; use std::mem::MaybeUninit; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::ops::Add; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::Arc; +use iblt::IBLT; + use serde::{Deserialize, Serialize}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; @@ -27,13 +30,9 @@ use crate::host::Host; use crate::protocol::*; use crate::utils::*; use crate::varint; -use iblt::IBLT; -/// Period for running main housekeeping pass. -const HOUSEKEEPING_PERIOD: i64 = SYNC_STATUS_PERIOD; - -/// Inactivity timeout for connections in milliseconds. -const CONNECTION_TIMEOUT: i64 = SYNC_STATUS_PERIOD * 4; +// Interval for announcing queued HaveRecords items, in milliseconds. +const ANNOUNCE_PERIOD: i64 = 100; /// Information about a remote node to which we are connected. #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -89,8 +88,9 @@ impl Node { }, datastore: db.clone(), host: host.clone(), - connections: Mutex::new(HashMap::with_capacity(64)), - announce_queue: Mutex::new(HashMap::with_capacity(256)), + connections: Mutex::new(HashMap::new()), + connecting_to: Mutex::new(HashSet::new()), + announce_queue: Mutex::new(HashMap::new()), bind_address, starting_instant: Instant::now(), }); @@ -114,11 +114,18 @@ impl Node { } /// Attempt to connect to an explicitly specified TCP endpoint. - pub async fn connect(&self, endpoint: &SocketAddr) -> std::io::Result { - self.internal.clone().connect(endpoint, Instant::now().add(Duration::from_millis(CONNECTION_TIMEOUT as u64))).await + /// + /// Ok(true) is returned if a new connection was made. Ok(false) means there is already a connection + /// to the endpoint. An error is returned if the connection fails. + pub async fn connect(&self, address: &SocketAddr) -> std::io::Result { + if self.internal.connecting_to.lock().await.insert(address.clone()) { + self.internal.connect(address, Instant::now().add(Duration::from_millis(self.internal.host.node_config().connection_timeout))).await + } else { + Ok(false) + } } - /// Get open peer to peer connections. + /// Get a list of all open peer to peer connections. pub async fn list_connections(&self) -> Vec { let connections = self.internal.connections.lock().await; let mut cl: Vec = Vec::with_capacity(connections.len()); @@ -168,8 +175,11 @@ pub struct NodeInternal { // Connections and their task join handles, by remote endpoint address. connections: Mutex>>, + // Outgoing connections in progress. + connecting_to: Mutex>, + // Records received since last announce and the endpoints that we know already have them. - announce_queue: Mutex>>, + announce_queue: Mutex>>, // Local address to which this node is bound bind_address: SocketAddr, @@ -184,38 +194,36 @@ impl NodeInternal { } async fn housekeeping_task_main(self: Arc) { - let mut tasks: Vec> = Vec::new(); - let mut counts: Vec = Vec::new(); - let mut connected_to_addresses: HashSet = HashSet::new(); - let mut sleep_until = Instant::now().add(Duration::from_millis(500)); + let tasks = AsyncTaskReaper::new(); + let mut sleep_for = Duration::from_millis(500); loop { - tokio::time::sleep_until(sleep_until).await; - sleep_until = sleep_until.add(Duration::from_millis(HOUSEKEEPING_PERIOD as u64)); + tokio::time::sleep(sleep_for).await; - tasks.clear(); - counts.clear(); - connected_to_addresses.clear(); + let config = self.host.node_config(); + let mut connections = self.connections.lock().await; + let mut connecting_to = self.connecting_to.lock().await; let now = self.ms_monotonic(); - self.connections.lock().await.retain(|sa, c| { + // Drop dead or timed out connections, and for live connections handle sending sync requests. + connections.retain(|_, c| { if !c.closed.load(Ordering::Relaxed) { let cc = c.clone(); - if (now - c.last_receive_time.load(Ordering::Relaxed)) < CONNECTION_TIMEOUT { - connected_to_addresses.insert(sa.clone()); + if (now - c.last_receive_time.load(Ordering::Relaxed)) < (config.connection_timeout as i64) { + // TODO: sync init if not waiting for a sync response true // keep connection } else { let _ = c.read_task.lock().unwrap().take().map(|j| j.abort()); let host = self.host.clone(); - tasks.push(tokio::spawn(async move { + tasks.spawn(async move { host.on_connection_closed(&*cc.info.lock().unwrap(), "timeout".to_string()); - })); + }); false // discard connection } } else { let host = self.host.clone(); let cc = c.clone(); let j = c.read_task.lock().unwrap().take(); - tasks.push(tokio::spawn(async move { + tasks.spawn(async move { if j.is_some() { let e = j.unwrap().await; if e.is_ok() { @@ -227,83 +235,73 @@ impl NodeInternal { } else { host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string()); } - })); + }); false // discard connection } }); - let config = self.host.node_config(); + let connect_timeout_at = Instant::now().add(Duration::from_millis(config.connection_timeout)); // Always try to connect to anchor peers. for sa in config.anchors.iter() { - if !connected_to_addresses.contains(sa) { - let sa = sa.clone(); + if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) { let self2 = self.clone(); - tasks.push(tokio::spawn(async move { - let _ = self2.connect(&sa, sleep_until).await; - })); - connected_to_addresses.insert(sa.clone()); + let sa = sa.clone(); + tasks.spawn(async move { + let _ = self2.connect(&sa, connect_timeout_at).await; + }); } } // Try to connect to more peers until desired connection count is reached. let desired_connection_count = config.desired_connection_count.min(config.max_connection_count); - for sa in config.seeds.iter() { - if connected_to_addresses.len() >= desired_connection_count { + for sa in config.peers.iter() { + if (connections.len() + connecting_to.len()) >= desired_connection_count { break; } - if !connected_to_addresses.contains(sa) { - connected_to_addresses.insert(sa.clone()); + if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) { let self2 = self.clone(); let sa = sa.clone(); - tasks.push(tokio::spawn(async move { - let _ = self2.connect(&sa, sleep_until).await; - })); + tasks.spawn(async move { + let _ = self2.connect(&sa, connect_timeout_at).await; + }); } } - // Wait for this iteration's batched background tasks to complete. - loop { - let s = tasks.pop(); - if s.is_some() { - let _ = s.unwrap().await; - } else { - break; - } - } + sleep_for = Duration::from_millis(config.sync_interval.min(config.connection_timeout)); } } async fn announce_task_main(self: Arc) { - let mut sleep_until = Instant::now().add(Duration::from_millis(ANNOUNCE_PERIOD as u64)); - let mut to_announce: Vec<([u8; ANNOUNCE_KEY_LEN], Vec)> = Vec::with_capacity(256); + let sleep_for = Duration::from_millis(ANNOUNCE_PERIOD as u64); + let mut to_announce: Vec<([u8; KEY_SIZE], Vec)> = Vec::new(); let background_tasks = AsyncTaskReaper::new(); - let announce_timeout = Duration::from_millis(CONNECTION_TIMEOUT as u64); + let announce_timeout = Duration::from_millis(self.host.node_config().connection_timeout); loop { - tokio::time::sleep_until(sleep_until).await; - sleep_until = sleep_until.add(Duration::from_millis(ANNOUNCE_PERIOD as u64)); + tokio::time::sleep(sleep_for).await; for (key, already_has) in self.announce_queue.lock().await.drain() { to_announce.push((key, already_has)); } let now = self.ms_monotonic(); + let have_records_est_size = (to_announce.len() * KEY_SIZE) + 2; + let mut have_records: Vec = Vec::with_capacity(have_records_est_size); for c in self.connections.lock().await.iter() { if c.1.announce_new_records.load(Ordering::Relaxed) { - let mut have_records: Vec = Vec::with_capacity((to_announce.len() * ANNOUNCE_KEY_LEN) + 4); - have_records.push(ANNOUNCE_KEY_LEN as u8); for (key, already_has) in to_announce.iter() { if !already_has.contains(c.0) { let _ = std::io::Write::write_all(&mut have_records, key); } } - if have_records.len() > 1 { + if !have_records.is_empty() { let c2 = c.1.clone(); background_tasks.spawn(async move { // If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to // write buffering but a short timeout prevents them from building up too much. let _ = tokio::time::timeout(announce_timeout, c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now)); - }) + }); + have_records = Vec::with_capacity(have_records_est_size); } } } @@ -327,19 +325,33 @@ impl NodeInternal { } } - async fn connect(self: Arc, address: &SocketAddr, deadline: Instant) -> std::io::Result { - self.host.on_connect_attempt(address); - let stream = if address.is_ipv4() { TcpSocket::new_v4() } else { TcpSocket::new_v6() }?; - configure_tcp_socket(&stream)?; - stream.bind(self.bind_address.clone())?; - let stream = tokio::time::timeout_at(deadline, stream.connect(address.clone())).await; - if stream.is_ok() { - Ok(self.connection_start(address.clone(), stream.unwrap()?, false).await) - } else { - Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out")) - } + /// Internal connection method. + /// + /// Note that this does not add the address to connecting_to. Instead it's done by the caller + /// to avoid races and similar things. It is removed from connecting_to once the connection + /// either succeeds or fails. + async fn connect(self: &Arc, address: &SocketAddr, deadline: Instant) -> std::io::Result { + let f = async { + let stream = if address.is_ipv4() { TcpSocket::new_v4() } else { TcpSocket::new_v6() }?; + configure_tcp_socket(&stream)?; + stream.bind(self.bind_address.clone())?; + + let stream = tokio::time::timeout_at(deadline, stream.connect(address.clone())); + self.host.on_connect_attempt(address); + let stream = stream.await; + + if stream.is_ok() { + Ok(self.connection_start(address.clone(), stream.unwrap()?, false).await) + } else { + Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out")) + } + }; + let r = f.await; + let _ = self.connecting_to.lock().await.remove(address); + r } + /// Initialize and start a connection whether incoming or outgoing. async fn connection_start(self: &Arc, address: SocketAddr, stream: TcpStream, inbound: bool) -> bool { let mut ok = false; let _ = self.connections.lock().await.entry(address.clone()).or_insert_with(|| { @@ -447,7 +459,7 @@ impl NodeInternal { } } } - let mut message = &read_buffer.as_slice()[header_size..total_size]; + let message = &read_buffer.as_slice()[header_size..total_size]; let now = self.ms_monotonic(); connection.last_receive_time.store(now, Ordering::Relaxed); @@ -480,7 +492,7 @@ impl NodeInternal { } let auth_challenge_response = auth_challenge_response.unwrap(); - (H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge), H::hmac_sha512(&H::sha512(&[self.datastore.domain().as_bytes()]), msg.domain_challenge), auth_challenge_response) + (H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge), H::hmac_sha512(&H::sha512(&[self.host.node_config().domain.as_bytes()]), msg.domain_challenge), auth_challenge_response) }; connection @@ -508,7 +520,7 @@ impl NodeInternal { if msg.anti_loopback_response.eq(&H::hmac_sha512(&self.anti_loopback_secret, &anti_loopback_challenge_sent)) { return Err(std::io::Error::new(std::io::ErrorKind::Other, "rejected connection to self")); } - if !msg.domain_response.eq(&H::hmac_sha512(&H::sha512(&[self.datastore.domain().as_bytes()]), &domain_challenge_sent)) { + if !msg.domain_response.eq(&H::hmac_sha512(&H::sha512(&[self.host.node_config().domain.as_bytes()]), &domain_challenge_sent)) { return Err(std::io::Error::new(std::io::ErrorKind::Other, "domain mismatch")); } if !self.host.authenticate(&info, &auth_challenge_sent).map_or(false, |cr| msg.auth_response.eq(&cr)) { @@ -516,7 +528,6 @@ impl NodeInternal { } initialized = true; - info.initialized = true; let info = info.clone(); // also releases lock since info is replaced/destroyed @@ -538,9 +549,8 @@ impl NodeInternal { let key = H::sha512(&[message]); match self.datastore.store(&key, message).await { StoreResult::Ok => { - let announce_key: [u8; ANNOUNCE_KEY_LEN] = (&key[..ANNOUNCE_KEY_LEN]).try_into().unwrap(); let mut q = self.announce_queue.lock().await; - let ql = q.entry(announce_key).or_insert_with(|| Vec::with_capacity(2)); + let ql = q.entry(key).or_insert_with(|| Vec::with_capacity(2)); if !ql.contains(&remote_address) { ql.push(remote_address.clone()); } @@ -552,6 +562,10 @@ impl NodeInternal { } } + MessageType::SyncRequest => { + let msg: msg::SyncRequest = decode_msgpack(message)?; + } + MessageType::Sync => { let msg: msg::Sync = decode_msgpack(message)?; } diff --git a/syncwhole/src/protocol.rs b/syncwhole/src/protocol.rs index 3d841ab40..30f0315a9 100644 --- a/syncwhole/src/protocol.rs +++ b/syncwhole/src/protocol.rs @@ -6,15 +6,6 @@ * https://www.zerotier.com/ */ -/// Number of bytes of SHA512 to announce, should be high enough to make collisions virtually impossible. -pub const ANNOUNCE_KEY_LEN: usize = 24; - -/// Send SyncStatus this frequently, in milliseconds. -pub const SYNC_STATUS_PERIOD: i64 = 5000; - -/// Check for and announce that we "have" records this often in milliseconds. -pub const ANNOUNCE_PERIOD: i64 = 100; - #[derive(Clone, Copy, Eq, PartialEq)] #[repr(u8)] pub enum MessageType { @@ -36,15 +27,20 @@ pub enum MessageType { /// Record = 5_u8, + /// msg::SyncRequest (msgpack) + SyncRequest = 6_u8, + /// msg::Sync (msgpack) Sync = 7_u8, } +const MESSAGE_TYPE_MAX: u8 = 7; + impl From for MessageType { /// Get a type from a byte, returning the Nop type if the byte is out of range. #[inline(always)] fn from(b: u8) -> Self { - if b <= 7 { + if b <= MESSAGE_TYPE_MAX { unsafe { std::mem::transmute(b) } } else { Self::Nop @@ -62,6 +58,7 @@ impl MessageType { Self::HaveRecords => "HAVE_RECORDS", Self::GetRecords => "GET_RECORDS", Self::Record => "RECORD", + Self::SyncRequest => "SYNC_REQUEST", Self::Sync => "SYNC", } } @@ -134,30 +131,46 @@ pub mod msg { #[derive(Serialize, Deserialize)] pub struct SyncRequest<'a> { - /// 64-bit prefix of record keys for this request - #[serde(rename = "p")] - pub prefix: u64, + /// Starting range to query, padded with zeroes if shorter than KEY_SIZE. + #[serde(with = "serde_bytes")] + #[serde(rename = "s")] + pub range_start: &'a [u8], - /// Number of bits in prefix that are meaningful - #[serde(rename = "b")] - pub prefix_bits: u8, + /// Ending range to query, padded with 0xff if shorter than KEY_SIZE. + #[serde(with = "serde_bytes")] + #[serde(rename = "e")] + pub range_end: &'a [u8], /// Data-store-specific subset selector indicating what subset of items desired - pub subset: &'a [u8], + #[serde(with = "serde_bytes")] + #[serde(rename = "q")] + pub subset: Option<&'a [u8]>, } #[derive(Serialize, Deserialize)] pub struct Sync<'a> { - /// 64-bit prefix of record keys for this request - #[serde(rename = "p")] - pub prefix: u64, + /// Starting range summarized, padded with zeroes if shorter than KEY_SIZE. + #[serde(with = "serde_bytes")] + #[serde(rename = "s")] + pub range_start: &'a [u8], - /// Number of bits in prefix that are meaningful - #[serde(rename = "b")] - pub prefix_bits: u8, + /// Ending range summarized, padded with 0xff if shorter than KEY_SIZE. + #[serde(with = "serde_bytes")] + #[serde(rename = "e")] + pub range_end: &'a [u8], /// Data-store-specific subset selector indicating what subset of items were included - pub subset: &'a [u8], + #[serde(with = "serde_bytes")] + #[serde(rename = "q")] + pub subset: Option<&'a [u8]>, + + /// Number of buckets in IBLT + #[serde(rename = "b")] + pub iblt_buckets: usize, + + /// Number of bytes in each IBLT item (key prefix) + #[serde(rename = "l")] + pub iblt_item_bytes: usize, /// Set summary for keys under prefix within subset #[serde(with = "serde_bytes")] diff --git a/syncwhole/src/utils.rs b/syncwhole/src/utils.rs index 6b4eb4cac..9c72d5264 100644 --- a/syncwhole/src/utils.rs +++ b/syncwhole/src/utils.rs @@ -50,6 +50,7 @@ pub fn splitmix64(mut x: u64) -> u64 { x } +/* #[inline(always)] pub fn splitmix64_inverse(mut x: u64) -> u64 { x ^= x.wrapping_shr(31) ^ x.wrapping_shr(62); @@ -59,6 +60,7 @@ pub fn splitmix64_inverse(mut x: u64) -> u64 { x ^= x.wrapping_shr(30) ^ x.wrapping_shr(60); x } +*/ static mut RANDOM_STATE_0: u64 = 0; static mut RANDOM_STATE_1: u64 = 0;