sync work in progress

This commit is contained in:
Adam Ierymenko 2022-04-19 09:57:47 -04:00
parent d8ea29319a
commit 1478053e43
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
5 changed files with 169 additions and 152 deletions

View file

@ -26,32 +26,16 @@ pub enum StoreResult {
Rejected, Rejected,
} }
/// Convert a prefix into an inclusive range of keys.
///
/// This is a convenience function for implementing keys_under() with data stores that support
/// straightforward range queries with full keys.
pub fn prefix_to_range(prefix: u64, prefix_bits: u32) -> ([u8; KEY_SIZE], [u8; KEY_SIZE]) {
let mut a = [0_u8; KEY_SIZE];
a[0..8].copy_from_slice(&((prefix & 0xffffffffffffffff_u64.wrapping_shl(64 - prefix_bits)).to_be_bytes()));
let mut b = [0xff_u8; KEY_SIZE];
b[0..8].copy_from_slice(&((prefix | 0xffffffffffffffff_u64.wrapping_shr(prefix_bits)).to_be_bytes()));
(a, b)
}
/// API to be implemented by the data set we want to replicate. /// API to be implemented by the data set we want to replicate.
/// ///
/// Keys as understood by syncwhole are SHA512 hashes of values. The user can of course /// Keys as used in this API are SHA512 hashes of values.
/// have their own concept of a "key" separate from this, but that would not be used
/// for data set replication. Replication is content identity based.
/// ///
/// The API specified here supports temporally subjective data sets. These are data sets /// Range queries take an optional subset parameter. The format and interpretation of
/// where the existence or non-existence of a record may depend on the (real world) time. /// this is entirely up to the implementer of DataStore. It could contain a time, a SQL
/// A parameter for reference time allows a remote querying node to send its own "this is /// query, a set of certificates, anything. Its purpose is to select which items we want
/// what time I think it is" value to be considered locally so that data can be replicated /// from remote nodes so that we can replicate only a subset of a larger set of data.
/// as of any given time. /// Other nodes can also supply a subset to this one, so it's important that remote subset
/// /// values supplied to the local data store be handled correctly.
/// In any call with a reference_time it should be ignored if it's zero. A zero reference
/// time should mean include all data that we have.
#[async_trait] #[async_trait]
pub trait DataStore: Sync + Send { pub trait DataStore: Sync + Send {
/// Container for values returned by load(). /// Container for values returned by load().
@ -66,17 +50,8 @@ pub trait DataStore: Sync + Send {
/// Maximum size of a value in bytes. /// Maximum size of a value in bytes.
const MAX_VALUE_SIZE: usize; const MAX_VALUE_SIZE: usize;
/// Get the domain of this data store. /// Get the subset that should be sent to remote nodes in queries.
/// async fn local_subset(&self) -> Option<Self::ValueRef>;
/// This is an arbitrary unique identifier that must be the same for all nodes that
/// are replicating the same data. It's checked on connect to avoid trying to share
/// data across data sets if this is not desired.
fn domain(&self) -> &str;
/// Get the reference time that should be used on this side to query remote peers.
///
/// This is typically the local "wall clock" time in milliseconds since Unix epoch.
fn reference_time(&self) -> i64;
/// Get an item by identity hash key if it exists. /// Get an item by identity hash key if it exists.
async fn load(&self, key: &[u8; KEY_SIZE]) -> Option<Self::ValueRef>; async fn load(&self, key: &[u8; KEY_SIZE]) -> Option<Self::ValueRef>;
@ -106,17 +81,13 @@ pub trait DataStore: Sync + Send {
/// returned if the value is valid but is too old or was rejected for some other normal reason. /// returned if the value is valid but is too old or was rejected for some other normal reason.
async fn store(&self, key: &[u8; KEY_SIZE], value: &[u8]) -> StoreResult; async fn store(&self, key: &[u8; KEY_SIZE], value: &[u8]) -> StoreResult;
/// Iterate through keys under a given key prefix. /// Iterate through keys in a range.
///
/// The prefix is a bit string up to 64 bits long. The implementation can technically interpret this
/// any way it wants, but usually this would be the first 64 bits of the key as a big-endian bit string.
/// ///
/// Keys MUST be output in ascending binary sort order. /// Keys MUST be output in ascending binary sort order.
async fn keys_under<F: Send + FnMut(&[u8]) -> bool>(&self, reference_time: i64, prefix: u64, prefix_bits: u32, f: F); async fn keys<F: Send + FnMut(&[u8]) -> bool>(&self, subset: Option<&[u8]>, range_start: &[u8; KEY_SIZE], range_end: &[u8; KEY_SIZE], f: F);
/// Load all record values under a given key prefix. /// Iterate through values in a range.
/// ///
/// This should clear and fill the result, fetching up to the limit values under a given key prefix. /// Entries MUST be output in ascending binary sort order.
/// Values may be pushed into the vector in any order. async fn values<F: Send + FnMut(&[u8], &[u8]) -> bool>(&self, subset: Option<&[u8]>, range_start: &[u8; KEY_SIZE], range_end: &[u8; KEY_SIZE], f: F);
async fn values_under(&self, prefix: u64, prefix_bits: u32, result: &mut Vec<Option<Self::ValueRef>>, limit: usize);
} }

View file

@ -7,6 +7,7 @@
*/ */
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
#[cfg(feature = "include_sha2_lib")] #[cfg(feature = "include_sha2_lib")]
use sha2::digest::{Digest, FixedOutput}; use sha2::digest::{Digest, FixedOutput};
@ -22,8 +23,9 @@ pub struct Config {
/// The library will try to maintain connectivity to these regardless of connection limits. /// The library will try to maintain connectivity to these regardless of connection limits.
pub anchors: Vec<SocketAddr>, pub anchors: Vec<SocketAddr>,
/// A list of peer addresses that we can try in order to achieve desired_connection_count. /// A list of other peer addresses that we can try in order to achieve desired_connection_count.
pub seeds: Vec<SocketAddr>, /// If this includes the anchors too there will be no effect since the anchors are tried first anyway.
pub peers: Vec<SocketAddr>,
/// The maximum number of TCP connections we should allow. /// The maximum number of TCP connections we should allow.
pub max_connection_count: usize, pub max_connection_count: usize,
@ -31,6 +33,15 @@ pub struct Config {
/// The desired number of peering links. /// The desired number of peering links.
pub desired_connection_count: usize, pub desired_connection_count: usize,
/// Synchronization interval in milliseconds.
pub sync_interval: u64,
/// Connection inactivity timeout in milliseconds.
pub connection_timeout: u64,
/// An arbitrary name for this data set to avoid connecting to irrelevant nodes.
pub domain: String,
/// An optional name for this node to advertise to other nodes. /// An optional name for this node to advertise to other nodes.
pub name: String, pub name: String,
@ -43,9 +54,12 @@ impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
anchors: Vec::new(), anchors: Vec::new(),
seeds: Vec::new(), peers: Vec::new(),
max_connection_count: 128, max_connection_count: 128,
desired_connection_count: 64, desired_connection_count: 64,
sync_interval: 500,
connection_timeout: 500 * 10,
domain: String::new(),
name: String::new(), name: String::new(),
contact: String::new(), contact: String::new(),
} }
@ -54,14 +68,16 @@ impl Default for Config {
/// A trait that users of syncwhole implement to provide configuration information and listen for events. /// A trait that users of syncwhole implement to provide configuration information and listen for events.
pub trait Host: Sync + Send { pub trait Host: Sync + Send {
/// Get a copy of the current configuration for this syncwhole node. /// Get the current configuration for this node.
fn node_config(&self) -> Config; fn node_config(&self) -> Arc<Config>;
/// Test whether an inbound connection should be allowed from an address. /// Test whether an inbound connection should be allowed from an address.
/// ///
/// This is called on first incoming connection before any init is received. The authenticate() /// This is called on first incoming connection before any init is received. The authenticate()
/// method is called once init has been received and is another decision point. The default /// method is called once init has been received and is another decision point. The default
/// implementation of this always returns true. /// implementation of this always returns true.
///
/// This is not called for outbound connections.
#[allow(unused_variables)] #[allow(unused_variables)]
fn allow(&self, remote_address: &SocketAddr) -> bool { fn allow(&self, remote_address: &SocketAddr) -> bool {
true true
@ -98,7 +114,7 @@ pub trait Host: Sync + Send {
/// Fill a buffer with secure random bytes. /// Fill a buffer with secure random bytes.
/// ///
/// This is supplied to reduce inherent dependencies and allow the user to choose the implementation. /// The implementer must call a secure random number generator or source to implement this.
fn get_secure_random(&self, buf: &mut [u8]); fn get_secure_random(&self, buf: &mut [u8]);
/// Compute a SHA512 digest of the input. /// Compute a SHA512 digest of the input.
@ -106,7 +122,8 @@ pub trait Host: Sync + Send {
/// Input can consist of one or more slices that will be processed in order. /// Input can consist of one or more slices that will be processed in order.
/// ///
/// If the feature "include_sha2_lib" is enabled a default implementation in terms of the /// If the feature "include_sha2_lib" is enabled a default implementation in terms of the
/// Rust sha2 crate is generated. Otherwise the user must supply their own implementation. /// Rust sha2 crate is generated. Otherwise the implementer must supply their own
/// SHA512 function.
#[cfg(not(feature = "include_sha2_lib"))] #[cfg(not(feature = "include_sha2_lib"))]
fn sha512(msg: &[&[u8]]) -> [u8; 64]; fn sha512(msg: &[&[u8]]) -> [u8; 64];
#[cfg(feature = "include_sha2_lib")] #[cfg(feature = "include_sha2_lib")]

View file

@ -11,10 +11,13 @@ use std::io::IoSlice;
use std::mem::MaybeUninit; use std::mem::MaybeUninit;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::ops::Add; use std::ops::Add;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use iblt::IBLT;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpSocket, TcpStream}; use tokio::net::{TcpListener, TcpSocket, TcpStream};
@ -27,13 +30,9 @@ use crate::host::Host;
use crate::protocol::*; use crate::protocol::*;
use crate::utils::*; use crate::utils::*;
use crate::varint; use crate::varint;
use iblt::IBLT;
/// Period for running main housekeeping pass. // Interval for announcing queued HaveRecords items, in milliseconds.
const HOUSEKEEPING_PERIOD: i64 = SYNC_STATUS_PERIOD; const ANNOUNCE_PERIOD: i64 = 100;
/// Inactivity timeout for connections in milliseconds.
const CONNECTION_TIMEOUT: i64 = SYNC_STATUS_PERIOD * 4;
/// Information about a remote node to which we are connected. /// Information about a remote node to which we are connected.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -89,8 +88,9 @@ impl<D: DataStore + 'static, H: Host + 'static> Node<D, H> {
}, },
datastore: db.clone(), datastore: db.clone(),
host: host.clone(), host: host.clone(),
connections: Mutex::new(HashMap::with_capacity(64)), connections: Mutex::new(HashMap::new()),
announce_queue: Mutex::new(HashMap::with_capacity(256)), connecting_to: Mutex::new(HashSet::new()),
announce_queue: Mutex::new(HashMap::new()),
bind_address, bind_address,
starting_instant: Instant::now(), starting_instant: Instant::now(),
}); });
@ -114,11 +114,18 @@ impl<D: DataStore + 'static, H: Host + 'static> Node<D, H> {
} }
/// Attempt to connect to an explicitly specified TCP endpoint. /// Attempt to connect to an explicitly specified TCP endpoint.
pub async fn connect(&self, endpoint: &SocketAddr) -> std::io::Result<bool> { ///
self.internal.clone().connect(endpoint, Instant::now().add(Duration::from_millis(CONNECTION_TIMEOUT as u64))).await /// Ok(true) is returned if a new connection was made. Ok(false) means there is already a connection
/// to the endpoint. An error is returned if the connection fails.
pub async fn connect(&self, address: &SocketAddr) -> std::io::Result<bool> {
if self.internal.connecting_to.lock().await.insert(address.clone()) {
self.internal.connect(address, Instant::now().add(Duration::from_millis(self.internal.host.node_config().connection_timeout))).await
} else {
Ok(false)
}
} }
/// Get open peer to peer connections. /// Get a list of all open peer to peer connections.
pub async fn list_connections(&self) -> Vec<RemoteNodeInfo> { pub async fn list_connections(&self) -> Vec<RemoteNodeInfo> {
let connections = self.internal.connections.lock().await; let connections = self.internal.connections.lock().await;
let mut cl: Vec<RemoteNodeInfo> = Vec::with_capacity(connections.len()); let mut cl: Vec<RemoteNodeInfo> = Vec::with_capacity(connections.len());
@ -168,8 +175,11 @@ pub struct NodeInternal<D: DataStore + 'static, H: Host + 'static> {
// Connections and their task join handles, by remote endpoint address. // Connections and their task join handles, by remote endpoint address.
connections: Mutex<HashMap<SocketAddr, Arc<Connection>>>, connections: Mutex<HashMap<SocketAddr, Arc<Connection>>>,
// Outgoing connections in progress.
connecting_to: Mutex<HashSet<SocketAddr>>,
// Records received since last announce and the endpoints that we know already have them. // Records received since last announce and the endpoints that we know already have them.
announce_queue: Mutex<HashMap<[u8; ANNOUNCE_KEY_LEN], Vec<SocketAddr>>>, announce_queue: Mutex<HashMap<[u8; KEY_SIZE], Vec<SocketAddr>>>,
// Local address to which this node is bound // Local address to which this node is bound
bind_address: SocketAddr, bind_address: SocketAddr,
@ -184,38 +194,36 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
} }
async fn housekeeping_task_main(self: Arc<Self>) { async fn housekeeping_task_main(self: Arc<Self>) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new(); let tasks = AsyncTaskReaper::new();
let mut counts: Vec<u64> = Vec::new(); let mut sleep_for = Duration::from_millis(500);
let mut connected_to_addresses: HashSet<SocketAddr> = HashSet::new();
let mut sleep_until = Instant::now().add(Duration::from_millis(500));
loop { loop {
tokio::time::sleep_until(sleep_until).await; tokio::time::sleep(sleep_for).await;
sleep_until = sleep_until.add(Duration::from_millis(HOUSEKEEPING_PERIOD as u64));
tasks.clear(); let config = self.host.node_config();
counts.clear(); let mut connections = self.connections.lock().await;
connected_to_addresses.clear(); let mut connecting_to = self.connecting_to.lock().await;
let now = self.ms_monotonic(); let now = self.ms_monotonic();
self.connections.lock().await.retain(|sa, c| { // Drop dead or timed out connections, and for live connections handle sending sync requests.
connections.retain(|_, c| {
if !c.closed.load(Ordering::Relaxed) { if !c.closed.load(Ordering::Relaxed) {
let cc = c.clone(); let cc = c.clone();
if (now - c.last_receive_time.load(Ordering::Relaxed)) < CONNECTION_TIMEOUT { if (now - c.last_receive_time.load(Ordering::Relaxed)) < (config.connection_timeout as i64) {
connected_to_addresses.insert(sa.clone()); // TODO: sync init if not waiting for a sync response
true // keep connection true // keep connection
} else { } else {
let _ = c.read_task.lock().unwrap().take().map(|j| j.abort()); let _ = c.read_task.lock().unwrap().take().map(|j| j.abort());
let host = self.host.clone(); let host = self.host.clone();
tasks.push(tokio::spawn(async move { tasks.spawn(async move {
host.on_connection_closed(&*cc.info.lock().unwrap(), "timeout".to_string()); host.on_connection_closed(&*cc.info.lock().unwrap(), "timeout".to_string());
})); });
false // discard connection false // discard connection
} }
} else { } else {
let host = self.host.clone(); let host = self.host.clone();
let cc = c.clone(); let cc = c.clone();
let j = c.read_task.lock().unwrap().take(); let j = c.read_task.lock().unwrap().take();
tasks.push(tokio::spawn(async move { tasks.spawn(async move {
if j.is_some() { if j.is_some() {
let e = j.unwrap().await; let e = j.unwrap().await;
if e.is_ok() { if e.is_ok() {
@ -227,83 +235,73 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
} else { } else {
host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string()); host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string());
} }
})); });
false // discard connection false // discard connection
} }
}); });
let config = self.host.node_config(); let connect_timeout_at = Instant::now().add(Duration::from_millis(config.connection_timeout));
// Always try to connect to anchor peers. // Always try to connect to anchor peers.
for sa in config.anchors.iter() { for sa in config.anchors.iter() {
if !connected_to_addresses.contains(sa) { if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) {
let sa = sa.clone();
let self2 = self.clone(); let self2 = self.clone();
tasks.push(tokio::spawn(async move { let sa = sa.clone();
let _ = self2.connect(&sa, sleep_until).await; tasks.spawn(async move {
})); let _ = self2.connect(&sa, connect_timeout_at).await;
connected_to_addresses.insert(sa.clone()); });
} }
} }
// Try to connect to more peers until desired connection count is reached. // Try to connect to more peers until desired connection count is reached.
let desired_connection_count = config.desired_connection_count.min(config.max_connection_count); let desired_connection_count = config.desired_connection_count.min(config.max_connection_count);
for sa in config.seeds.iter() { for sa in config.peers.iter() {
if connected_to_addresses.len() >= desired_connection_count { if (connections.len() + connecting_to.len()) >= desired_connection_count {
break; break;
} }
if !connected_to_addresses.contains(sa) { if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) {
connected_to_addresses.insert(sa.clone());
let self2 = self.clone(); let self2 = self.clone();
let sa = sa.clone(); let sa = sa.clone();
tasks.push(tokio::spawn(async move { tasks.spawn(async move {
let _ = self2.connect(&sa, sleep_until).await; let _ = self2.connect(&sa, connect_timeout_at).await;
})); });
} }
} }
// Wait for this iteration's batched background tasks to complete. sleep_for = Duration::from_millis(config.sync_interval.min(config.connection_timeout));
loop {
let s = tasks.pop();
if s.is_some() {
let _ = s.unwrap().await;
} else {
break;
}
}
} }
} }
async fn announce_task_main(self: Arc<Self>) { async fn announce_task_main(self: Arc<Self>) {
let mut sleep_until = Instant::now().add(Duration::from_millis(ANNOUNCE_PERIOD as u64)); let sleep_for = Duration::from_millis(ANNOUNCE_PERIOD as u64);
let mut to_announce: Vec<([u8; ANNOUNCE_KEY_LEN], Vec<SocketAddr>)> = Vec::with_capacity(256); let mut to_announce: Vec<([u8; KEY_SIZE], Vec<SocketAddr>)> = Vec::new();
let background_tasks = AsyncTaskReaper::new(); let background_tasks = AsyncTaskReaper::new();
let announce_timeout = Duration::from_millis(CONNECTION_TIMEOUT as u64); let announce_timeout = Duration::from_millis(self.host.node_config().connection_timeout);
loop { loop {
tokio::time::sleep_until(sleep_until).await; tokio::time::sleep(sleep_for).await;
sleep_until = sleep_until.add(Duration::from_millis(ANNOUNCE_PERIOD as u64));
for (key, already_has) in self.announce_queue.lock().await.drain() { for (key, already_has) in self.announce_queue.lock().await.drain() {
to_announce.push((key, already_has)); to_announce.push((key, already_has));
} }
let now = self.ms_monotonic(); let now = self.ms_monotonic();
let have_records_est_size = (to_announce.len() * KEY_SIZE) + 2;
let mut have_records: Vec<u8> = Vec::with_capacity(have_records_est_size);
for c in self.connections.lock().await.iter() { for c in self.connections.lock().await.iter() {
if c.1.announce_new_records.load(Ordering::Relaxed) { if c.1.announce_new_records.load(Ordering::Relaxed) {
let mut have_records: Vec<u8> = Vec::with_capacity((to_announce.len() * ANNOUNCE_KEY_LEN) + 4);
have_records.push(ANNOUNCE_KEY_LEN as u8);
for (key, already_has) in to_announce.iter() { for (key, already_has) in to_announce.iter() {
if !already_has.contains(c.0) { if !already_has.contains(c.0) {
let _ = std::io::Write::write_all(&mut have_records, key); let _ = std::io::Write::write_all(&mut have_records, key);
} }
} }
if have_records.len() > 1 { if !have_records.is_empty() {
let c2 = c.1.clone(); let c2 = c.1.clone();
background_tasks.spawn(async move { background_tasks.spawn(async move {
// If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to // If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to
// write buffering but a short timeout prevents them from building up too much. // write buffering but a short timeout prevents them from building up too much.
let _ = tokio::time::timeout(announce_timeout, c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now)); let _ = tokio::time::timeout(announce_timeout, c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now));
}) });
have_records = Vec::with_capacity(have_records_est_size);
} }
} }
} }
@ -327,19 +325,33 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
} }
} }
async fn connect(self: Arc<Self>, address: &SocketAddr, deadline: Instant) -> std::io::Result<bool> { /// Internal connection method.
self.host.on_connect_attempt(address); ///
/// Note that this does not add the address to connecting_to. Instead it's done by the caller
/// to avoid races and similar things. It is removed from connecting_to once the connection
/// either succeeds or fails.
async fn connect(self: &Arc<Self>, address: &SocketAddr, deadline: Instant) -> std::io::Result<bool> {
let f = async {
let stream = if address.is_ipv4() { TcpSocket::new_v4() } else { TcpSocket::new_v6() }?; let stream = if address.is_ipv4() { TcpSocket::new_v4() } else { TcpSocket::new_v6() }?;
configure_tcp_socket(&stream)?; configure_tcp_socket(&stream)?;
stream.bind(self.bind_address.clone())?; stream.bind(self.bind_address.clone())?;
let stream = tokio::time::timeout_at(deadline, stream.connect(address.clone())).await;
let stream = tokio::time::timeout_at(deadline, stream.connect(address.clone()));
self.host.on_connect_attempt(address);
let stream = stream.await;
if stream.is_ok() { if stream.is_ok() {
Ok(self.connection_start(address.clone(), stream.unwrap()?, false).await) Ok(self.connection_start(address.clone(), stream.unwrap()?, false).await)
} else { } else {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out")) Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))
} }
};
let r = f.await;
let _ = self.connecting_to.lock().await.remove(address);
r
} }
/// Initialize and start a connection whether incoming or outgoing.
async fn connection_start(self: &Arc<Self>, address: SocketAddr, stream: TcpStream, inbound: bool) -> bool { async fn connection_start(self: &Arc<Self>, address: SocketAddr, stream: TcpStream, inbound: bool) -> bool {
let mut ok = false; let mut ok = false;
let _ = self.connections.lock().await.entry(address.clone()).or_insert_with(|| { let _ = self.connections.lock().await.entry(address.clone()).or_insert_with(|| {
@ -447,7 +459,7 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
} }
} }
} }
let mut message = &read_buffer.as_slice()[header_size..total_size]; let message = &read_buffer.as_slice()[header_size..total_size];
let now = self.ms_monotonic(); let now = self.ms_monotonic();
connection.last_receive_time.store(now, Ordering::Relaxed); connection.last_receive_time.store(now, Ordering::Relaxed);
@ -480,7 +492,7 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
} }
let auth_challenge_response = auth_challenge_response.unwrap(); let auth_challenge_response = auth_challenge_response.unwrap();
(H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge), H::hmac_sha512(&H::sha512(&[self.datastore.domain().as_bytes()]), msg.domain_challenge), auth_challenge_response) (H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge), H::hmac_sha512(&H::sha512(&[self.host.node_config().domain.as_bytes()]), msg.domain_challenge), auth_challenge_response)
}; };
connection connection
@ -508,7 +520,7 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
if msg.anti_loopback_response.eq(&H::hmac_sha512(&self.anti_loopback_secret, &anti_loopback_challenge_sent)) { if msg.anti_loopback_response.eq(&H::hmac_sha512(&self.anti_loopback_secret, &anti_loopback_challenge_sent)) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "rejected connection to self")); return Err(std::io::Error::new(std::io::ErrorKind::Other, "rejected connection to self"));
} }
if !msg.domain_response.eq(&H::hmac_sha512(&H::sha512(&[self.datastore.domain().as_bytes()]), &domain_challenge_sent)) { if !msg.domain_response.eq(&H::hmac_sha512(&H::sha512(&[self.host.node_config().domain.as_bytes()]), &domain_challenge_sent)) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "domain mismatch")); return Err(std::io::Error::new(std::io::ErrorKind::Other, "domain mismatch"));
} }
if !self.host.authenticate(&info, &auth_challenge_sent).map_or(false, |cr| msg.auth_response.eq(&cr)) { if !self.host.authenticate(&info, &auth_challenge_sent).map_or(false, |cr| msg.auth_response.eq(&cr)) {
@ -516,7 +528,6 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
} }
initialized = true; initialized = true;
info.initialized = true; info.initialized = true;
let info = info.clone(); // also releases lock since info is replaced/destroyed let info = info.clone(); // also releases lock since info is replaced/destroyed
@ -538,9 +549,8 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
let key = H::sha512(&[message]); let key = H::sha512(&[message]);
match self.datastore.store(&key, message).await { match self.datastore.store(&key, message).await {
StoreResult::Ok => { StoreResult::Ok => {
let announce_key: [u8; ANNOUNCE_KEY_LEN] = (&key[..ANNOUNCE_KEY_LEN]).try_into().unwrap();
let mut q = self.announce_queue.lock().await; let mut q = self.announce_queue.lock().await;
let ql = q.entry(announce_key).or_insert_with(|| Vec::with_capacity(2)); let ql = q.entry(key).or_insert_with(|| Vec::with_capacity(2));
if !ql.contains(&remote_address) { if !ql.contains(&remote_address) {
ql.push(remote_address.clone()); ql.push(remote_address.clone());
} }
@ -552,6 +562,10 @@ impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
} }
} }
MessageType::SyncRequest => {
let msg: msg::SyncRequest = decode_msgpack(message)?;
}
MessageType::Sync => { MessageType::Sync => {
let msg: msg::Sync = decode_msgpack(message)?; let msg: msg::Sync = decode_msgpack(message)?;
} }

View file

@ -6,15 +6,6 @@
* https://www.zerotier.com/ * https://www.zerotier.com/
*/ */
/// Number of bytes of SHA512 to announce, should be high enough to make collisions virtually impossible.
pub const ANNOUNCE_KEY_LEN: usize = 24;
/// Send SyncStatus this frequently, in milliseconds.
pub const SYNC_STATUS_PERIOD: i64 = 5000;
/// Check for and announce that we "have" records this often in milliseconds.
pub const ANNOUNCE_PERIOD: i64 = 100;
#[derive(Clone, Copy, Eq, PartialEq)] #[derive(Clone, Copy, Eq, PartialEq)]
#[repr(u8)] #[repr(u8)]
pub enum MessageType { pub enum MessageType {
@ -36,15 +27,20 @@ pub enum MessageType {
/// <record> /// <record>
Record = 5_u8, Record = 5_u8,
/// msg::SyncRequest (msgpack)
SyncRequest = 6_u8,
/// msg::Sync (msgpack) /// msg::Sync (msgpack)
Sync = 7_u8, Sync = 7_u8,
} }
const MESSAGE_TYPE_MAX: u8 = 7;
impl From<u8> for MessageType { impl From<u8> for MessageType {
/// Get a type from a byte, returning the Nop type if the byte is out of range. /// Get a type from a byte, returning the Nop type if the byte is out of range.
#[inline(always)] #[inline(always)]
fn from(b: u8) -> Self { fn from(b: u8) -> Self {
if b <= 7 { if b <= MESSAGE_TYPE_MAX {
unsafe { std::mem::transmute(b) } unsafe { std::mem::transmute(b) }
} else { } else {
Self::Nop Self::Nop
@ -62,6 +58,7 @@ impl MessageType {
Self::HaveRecords => "HAVE_RECORDS", Self::HaveRecords => "HAVE_RECORDS",
Self::GetRecords => "GET_RECORDS", Self::GetRecords => "GET_RECORDS",
Self::Record => "RECORD", Self::Record => "RECORD",
Self::SyncRequest => "SYNC_REQUEST",
Self::Sync => "SYNC", Self::Sync => "SYNC",
} }
} }
@ -134,30 +131,46 @@ pub mod msg {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct SyncRequest<'a> { pub struct SyncRequest<'a> {
/// 64-bit prefix of record keys for this request /// Starting range to query, padded with zeroes if shorter than KEY_SIZE.
#[serde(rename = "p")] #[serde(with = "serde_bytes")]
pub prefix: u64, #[serde(rename = "s")]
pub range_start: &'a [u8],
/// Number of bits in prefix that are meaningful /// Ending range to query, padded with 0xff if shorter than KEY_SIZE.
#[serde(rename = "b")] #[serde(with = "serde_bytes")]
pub prefix_bits: u8, #[serde(rename = "e")]
pub range_end: &'a [u8],
/// Data-store-specific subset selector indicating what subset of items desired /// Data-store-specific subset selector indicating what subset of items desired
pub subset: &'a [u8], #[serde(with = "serde_bytes")]
#[serde(rename = "q")]
pub subset: Option<&'a [u8]>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Sync<'a> { pub struct Sync<'a> {
/// 64-bit prefix of record keys for this request /// Starting range summarized, padded with zeroes if shorter than KEY_SIZE.
#[serde(rename = "p")] #[serde(with = "serde_bytes")]
pub prefix: u64, #[serde(rename = "s")]
pub range_start: &'a [u8],
/// Number of bits in prefix that are meaningful /// Ending range summarized, padded with 0xff if shorter than KEY_SIZE.
#[serde(rename = "b")] #[serde(with = "serde_bytes")]
pub prefix_bits: u8, #[serde(rename = "e")]
pub range_end: &'a [u8],
/// Data-store-specific subset selector indicating what subset of items were included /// Data-store-specific subset selector indicating what subset of items were included
pub subset: &'a [u8], #[serde(with = "serde_bytes")]
#[serde(rename = "q")]
pub subset: Option<&'a [u8]>,
/// Number of buckets in IBLT
#[serde(rename = "b")]
pub iblt_buckets: usize,
/// Number of bytes in each IBLT item (key prefix)
#[serde(rename = "l")]
pub iblt_item_bytes: usize,
/// Set summary for keys under prefix within subset /// Set summary for keys under prefix within subset
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]

View file

@ -50,6 +50,7 @@ pub fn splitmix64(mut x: u64) -> u64 {
x x
} }
/*
#[inline(always)] #[inline(always)]
pub fn splitmix64_inverse(mut x: u64) -> u64 { pub fn splitmix64_inverse(mut x: u64) -> u64 {
x ^= x.wrapping_shr(31) ^ x.wrapping_shr(62); x ^= x.wrapping_shr(31) ^ x.wrapping_shr(62);
@ -59,6 +60,7 @@ pub fn splitmix64_inverse(mut x: u64) -> u64 {
x ^= x.wrapping_shr(30) ^ x.wrapping_shr(60); x ^= x.wrapping_shr(30) ^ x.wrapping_shr(60);
x x
} }
*/
static mut RANDOM_STATE_0: u64 = 0; static mut RANDOM_STATE_0: u64 = 0;
static mut RANDOM_STATE_1: u64 = 0; static mut RANDOM_STATE_1: u64 = 0;