diff --git a/rustfmt.toml b/rustfmt.toml index bf8d97024..1a8eb92da 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -6,3 +6,5 @@ control_brace_style = "AlwaysSameLine" edition = "2021" imports_granularity = "Crate" group_imports = "StdExternalCrate" +fn_single_line = true +struct_lit_width = 60 diff --git a/syncwhole/Cargo.lock b/syncwhole/Cargo.lock index eea0b6200..70e24fb53 100644 --- a/syncwhole/Cargo.lock +++ b/syncwhole/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "async-trait" +version = "0.1.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -16,9 +27,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "block-buffer" -version = "0.9.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324" dependencies = [ "generic-array", ] @@ -43,22 +54,39 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "cpufeatures" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" +checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b" dependencies = [ "libc", ] [[package]] -name = "digest" -version = "0.9.0" +name = "crypto-common" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" dependencies = [ "generic-array", + "typenum", ] +[[package]] +name = "digest" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + [[package]] name = "generic-array" version = "0.14.5" @@ -80,24 +108,25 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.119" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" +checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" [[package]] name = "lock_api" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" dependencies = [ + "autocfg", "scopeguard", ] [[package]] name = "log" -version = "0.4.14" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" dependencies = [ "cfg-if", ] @@ -110,14 +139,15 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" [[package]] name = "mio" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" dependencies = [ "libc", "log", "miow", "ntapi", + "wasi", "winapi", ] @@ -158,12 +188,6 @@ dependencies = [ "libc", ] -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "parking_lot" version = "0.12.0" @@ -176,9 +200,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.1" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" +checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" dependencies = [ "cfg-if", "libc", @@ -204,18 +228,18 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.15" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" +checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" dependencies = [ "proc-macro2", ] [[package]] name = "redox_syscall" -version = "0.2.10" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" dependencies = [ "bitflags", ] @@ -278,15 +302,13 @@ dependencies = [ [[package]] name = "sha2" -version = "0.9.9" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" dependencies = [ - "block-buffer", "cfg-if", "cpufeatures", "digest", - "opaque-debug", ] [[package]] @@ -307,9 +329,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.86" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" +checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" dependencies = [ "proc-macro2", "quote", @@ -320,6 +342,8 @@ dependencies = [ name = "syncwhole" version = "0.1.0" dependencies = [ + "async-trait", + "futures-core", "rmp", "rmp-serde", "serde", @@ -363,6 +387,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.3.9" @@ -387,9 +417,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" dependencies = [ "windows_aarch64_msvc", "windows_i686_gnu", @@ -400,30 +430,30 @@ dependencies = [ [[package]] name = "windows_aarch64_msvc" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" +checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" [[package]] name = "windows_i686_gnu" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" +checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" [[package]] name = "windows_i686_msvc" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" +checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" [[package]] name = "windows_x86_64_gnu" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" +checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" [[package]] name = "windows_x86_64_msvc" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" +checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" diff --git a/syncwhole/Cargo.toml b/syncwhole/Cargo.toml index 849afe136..1e7e19350 100644 --- a/syncwhole/Cargo.toml +++ b/syncwhole/Cargo.toml @@ -29,6 +29,8 @@ serde_bytes = "^0" rmp = "^0" rmp-serde = "^1" sha2 = { version = "^0", optional = true } +async-trait = "^0" +futures-core = "^0" [features] include_sha2_lib = ["sha2"] diff --git a/syncwhole/src/datastore.rs b/syncwhole/src/datastore.rs index 0890b513a..acf4714b4 100644 --- a/syncwhole/src/datastore.rs +++ b/syncwhole/src/datastore.rs @@ -6,49 +6,17 @@ * https://www.zerotier.com/ */ +use async_trait::async_trait; + /// Size of keys, which is the size of a 512-bit hash. This is a protocol constant. pub const KEY_SIZE: usize = 64; -/// Minimum possible key (all zero). +/// Minimum possible value in a key range (all zero). pub const MIN_KEY: [u8; KEY_SIZE] = [0; KEY_SIZE]; -/// Maximum possible key (all 0xff). +/// Maximum possible value in a key range (all 0xff). pub const MAX_KEY: [u8; KEY_SIZE] = [0xff; KEY_SIZE]; -/// Generate a range of SHA512 hashes from a prefix and a number of bits. -/// The range will be inclusive and cover all keys under the prefix. -pub fn range_from_prefix(prefix: &[u8], prefix_bits: usize) -> Option<([u8; KEY_SIZE], [u8; KEY_SIZE])> { - let mut start = [0_u8; KEY_SIZE]; - let mut end = [0xff_u8; KEY_SIZE]; - if prefix_bits > (KEY_SIZE * 8) { - return None; - } - let whole_bytes = prefix_bits / 8; - let remaining_bits = prefix_bits % 8; - if prefix.len() < (whole_bytes + ((remaining_bits != 0) as usize)) { - return None; - } - start[0..whole_bytes].copy_from_slice(&prefix[0..whole_bytes]); - end[0..whole_bytes].copy_from_slice(&prefix[0..whole_bytes]); - if remaining_bits != 0 { - start[whole_bytes] |= prefix[whole_bytes]; - end[whole_bytes] &= prefix[whole_bytes] | ((0xff_u8).wrapping_shr(remaining_bits as u32)); - } - return Some((start, end)); -} - -/// Result returned by DataStore::load(). -pub enum LoadResult + Send> { - /// Object was found. - Ok(V), - - /// Object was not found, including the case of being excluded due to the value of reference_time. - NotFound, - - /// Supplied reference_time is outside what is available (usually too old). - TimeNotAvailable, -} - /// Result returned by DataStore::store(). pub enum StoreResult { /// Entry was accepted. @@ -76,13 +44,15 @@ pub enum StoreResult { /// what time I think it is" value to be considered locally so that data can be replicated /// as of any given time. /// -/// The implementation must be thread safe and may be called concurrently. +/// In any call with a reference_time it should be ignored if it's zero. A zero reference +/// time should mean include all data that we have. +#[async_trait] pub trait DataStore: Sync + Send { - /// Type to be enclosed in the Ok() enum value in LoadResult. + /// Container for values returned by load(). /// - /// Allowing this type to be defined lets you use any type that makes sense with - /// your implementation. Examples include Box<[u8]>, Arc<[u8]>, Vec, etc. - type LoadResultValueType: AsRef<[u8]> + Send; + /// Making this a trait defined type lets you use Arc<[u8]>, etc. as well as obvious + /// ones like Box<[u8]> and Vec. + type ValueRef: AsRef<[u8]> + Sync + Send + Clone; /// Key hash size, always 64 for SHA512. const KEY_SIZE: usize = KEY_SIZE; @@ -101,17 +71,13 @@ pub trait DataStore: Sync + Send { fn domain(&self) -> &str; /// Get an item if it exists as of a given reference time. - fn load(&self, reference_time: i64, key: &[u8]) -> LoadResult; + async fn load(&self, reference_time: i64, key: &[u8]) -> Option; /// Check whether this data store contains a key. /// - /// The default implementation just uses load(). Override if you can provide a faster - /// version. - fn contains(&self, reference_time: i64, key: &[u8]) -> bool { - match self.load(reference_time, key) { - LoadResult::Ok(_) => true, - _ => false, - } + /// The default implementation just calls load(). Override if a faster version is possible. + async fn contains(&self, reference_time: i64, key: &[u8]) -> bool { + self.load(reference_time, key).await.is_some() } /// Store an item in the data store and return its status. @@ -137,22 +103,22 @@ pub trait DataStore: Sync + Send { /// Rejected should only be returned if the value actually fails a validity check, signature /// verification, proof of work check, or some other required criteria. Ignored must be /// returned if the value is valid but is too old or was rejected for some other normal reason. - fn store(&self, key: &[u8], value: &[u8]) -> StoreResult; + async fn store(&self, key: &[u8], value: &[u8]) -> StoreResult; /// Get the number of items in a range. - fn count(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8]) -> u64; + async fn count(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8]) -> u64; /// Get the total number of records in this data store. - fn total_count(&self) -> u64; + async fn total_count(&self) -> u64; - /// Iterate through keys, stopping if the function returns false. + /// Iterate through a series of keys in a range (inclusive), stopping when function returns false. /// - /// The default implementation uses for_each(). This can be specialized if it's faster to - /// only load keys. - fn for_each_key bool>(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], mut f: F) { - self.for_each(reference_time, key_range_start, key_range_end, |k, _| f(k)); + /// The default implementation uses for_each() and just drops the value. Specialize if you can do it faster + /// by only retrieving keys. + async fn for_each_key bool>(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], mut f: F) { + self.for_each(reference_time, key_range_start, key_range_end, |k, _| f(k)).await; } - /// Iterate through keys and values, stopping if the function returns false. - fn for_each bool>(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], f: F); + /// Iterate through a series of entries in a range (inclusive), stopping when function returns false. + async fn for_each bool>(&self, reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], f: F); } diff --git a/syncwhole/src/host.rs b/syncwhole/src/host.rs index eb5445589..b1b268e96 100644 --- a/syncwhole/src/host.rs +++ b/syncwhole/src/host.rs @@ -39,6 +39,19 @@ pub struct Config { pub contact: String, } +impl Default for Config { + fn default() -> Self { + Self { + anchors: Vec::new(), + seeds: Vec::new(), + max_connection_count: 128, + desired_connection_count: 64, + name: String::new(), + contact: String::new(), + } + } +} + /// A trait that users of syncwhole implement to provide configuration information and listen for events. pub trait Host: Sync + Send { /// Get a copy of the current configuration for this syncwhole node. @@ -102,7 +115,7 @@ pub trait Host: Sync + Send { for b in msg.iter() { h.update(*b); } - h.finalize_fixed().as_ref().try_into().unwrap() + h.finalize_fixed().try_into().unwrap() } /// Compute HMAC-SHA512 using key and input. diff --git a/syncwhole/src/iblt.rs b/syncwhole/src/iblt.rs index f787fd5ad..8c99b2d6a 100644 --- a/syncwhole/src/iblt.rs +++ b/syncwhole/src/iblt.rs @@ -6,13 +6,26 @@ * https://www.zerotier.com/ */ -use crate::utils::*; +use std::borrow::Cow; + +/// Total memory overhead of each bucket in bytes. +const BUCKET_SIZE_BYTES: usize = 13; // u64 key + u32 check + i8 count + +/// Based on xorshift64 with endian conversion for BE systems. +#[inline(always)] +fn get_check_hash(mut x: u64) -> u32 { + x = u64::from_le(x); + x ^= x.wrapping_shl(13); + x ^= x.wrapping_shr(7); + x ^= x.wrapping_shl(17); + x.wrapping_add(x.wrapping_shr(32)).to_le() as u32 +} /// Called to get the next iteration index for each KEY_MAPPING_ITERATIONS table lookup. -/// (See IBLT papers, etc.) +/// A series of these implements the "series of different hashes" construct in IBLT. #[inline(always)] -fn next_iteration_index(mut x: u64) -> u64 { - x = x.wrapping_add(1); +fn next_iteration_index(mut x: u64, hash_no: u64) -> u64 { + x = x.wrapping_add(hash_no); x ^= x.wrapping_shr(30); x = x.wrapping_mul(0xbf58476d1ce4e5b9); x ^= x.wrapping_shr(27); @@ -26,60 +39,82 @@ fn next_iteration_index(mut x: u64) -> u64 { /// Usage inspired by this paper: /// /// https://dash.harvard.edu/bitstream/handle/1/14398536/GENTILI-SENIORTHESIS-2015.pdf -#[repr(C, packed)] -pub struct IBLT { +/// +/// Note that an 8-bit counter that wraps is used instead of a much wider counter that +/// would probably never overflow. This saves space on the wire but means that there is +/// a very small (roughly 1/2^64) chance that this will list a value that is invalid in that +/// it was never added on either side. In our protocol that should not cause a problem as +/// it would just result in one key in a GetRecords query not being fulfilled, and in any +/// case it should be an extremely rare event. +/// +/// BUCKETS is the maximum capacity in buckets, while HASHES is the number of +/// "different" (differently seeded) hash functions to use. +/// +/// The best value for HASHES seems to be 3 for an optimal fill of 80%. +#[repr(C)] +pub struct IBLT { key: [u64; BUCKETS], - check_hash: [u64; BUCKETS], + check_hash: [u32; BUCKETS], count: [i8; BUCKETS], } -impl IBLT { - /// This was determined to be effective via empirical testing with random keys. This - /// is a protocol constant that can't be changed without upgrading all nodes in a domain. - const KEY_MAPPING_ITERATIONS: usize = 2; +impl Clone for IBLT { + #[inline(always)] + fn clone(&self) -> Self { + // NOTE: clone() is manually implemented here so it's tolerant of unaligned access on architectures not supporting it. + unsafe { + let mut tmp: Self = std::mem::MaybeUninit::uninit().assume_init(); + std::ptr::copy_nonoverlapping((self as *const Self).cast::(), (&mut tmp as *mut Self).cast::(), Self::SIZE_BYTES); + tmp + } + } +} +impl IBLT { /// Number of buckets in this IBLT. + #[allow(unused)] pub const BUCKETS: usize = BUCKETS; /// Size of this IBLT in bytes. - pub const SIZE_BYTES: usize = BUCKETS * (8 + 8 + 1); - - #[inline(always)] - fn is_singular(&self, i: usize) -> bool { - let c = self.count[i]; - if c == 1 || c == -1 { - xorshift64(self.key[i]) == self.check_hash[i] - } else { - false - } - } + pub const SIZE_BYTES: usize = BUCKETS * BUCKET_SIZE_BYTES; /// Create a new zeroed IBLT. + #[inline(always)] pub fn new() -> Self { - assert_eq!(Self::SIZE_BYTES, std::mem::size_of::()); + assert!(Self::SIZE_BYTES <= std::mem::size_of::()); assert!(BUCKETS < (i32::MAX as usize)); unsafe { std::mem::zeroed() } } - /// Cast a byte array to an IBLT if it is of the correct size. - pub fn ref_from_bytes(b: &[u8]) -> Option<&Self> { - if b.len() == Self::SIZE_BYTES { - Some(unsafe { &*b.as_ptr().cast() }) - } else { - None - } + /// Get this IBLT as a byte slice (free cast operation). + #[inline(always)] + pub fn as_bytes(&self) -> &[u8] { + unsafe { &*std::ptr::slice_from_raw_parts((self as *const Self).cast::(), Self::SIZE_BYTES) } } - /// Compute the IBLT size in buckets to reconcile a given set difference, or return 0 if no advantage. - /// This returns zero if an IBLT would take up as much or more space than just sending local_set_size - /// hashes of hash_size_bytes. + /// Obtain an IBLT from bytes in memory. + /// + /// If the architecture supports unaligned memory access or the memory is aligned, this returns a borrowed + /// Cow to 'b' that is just a cast. If re-alignment is necessary it returns an owned Cow containing a properly + /// aligned copy. This makes conversion a nearly free cast when alignment adjustment isn't needed. #[inline(always)] - pub fn calc_iblt_parameters(hash_size_bytes: usize, local_set_size: u64, difference_size: u64) -> usize { - let b = (difference_size as f64) * 1.8; // factor determined experimentally for best bytes/item, can be tuned - if b > 64.0 && (b * (8.0 + 8.0 + 1.0)) < ((hash_size_bytes as f64) * (local_set_size as f64)) { - b.round() as usize + pub fn from_bytes<'a>(b: &'a [u8]) -> Option> { + if b.len() == Self::SIZE_BYTES { + #[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "powerpc64", target_arch = "aarch64")))] + { + if b.as_ptr().align_offset(8) == 0 { + Some(Cow::Borrowed(unsafe { &*b.as_ptr().cast() })) + } else { + // NOTE: clone() is implemented above using a raw copy so that alignment doesn't matter. + Some(Cow::Owned(unsafe { &*b.as_ptr().cast::() }.clone())) + } + } + #[cfg(any(target_arch = "x86_64", target_arch = "x86", target_arch = "powerpc64", target_arch = "aarch64"))] + { + Some(Cow::Borrowed(unsafe { &*b.as_ptr().cast() })) + } } else { - 0 + None } } @@ -91,17 +126,11 @@ impl IBLT { } } - /// Get this IBLT as a byte slice in place. - #[inline(always)] - pub fn as_bytes(&self) -> &[u8] { - unsafe { &*std::ptr::slice_from_raw_parts((self as *const Self).cast::(), std::mem::size_of::()) } - } - fn ins_rem(&mut self, key: u64, delta: i8) { - let check_hash = xorshift64(key); + let check_hash = get_check_hash(key); let mut iteration_index = u64::from_le(key); - for _ in 0..Self::KEY_MAPPING_ITERATIONS { - iteration_index = next_iteration_index(iteration_index); + for k in 0..(HASHES as u64) { + iteration_index = next_iteration_index(iteration_index, k); let i = (iteration_index as usize) % BUCKETS; self.key[i] ^= key; self.check_hash[i] ^= check_hash; @@ -123,83 +152,84 @@ impl IBLT { /// Subtract another IBLT from this one to get a set difference. pub fn subtract(&mut self, other: &Self) { - for i in 0..BUCKETS { - self.key[i] ^= other.key[i]; - } - for i in 0..BUCKETS { - self.check_hash[i] ^= other.check_hash[i]; - } - for i in 0..BUCKETS { - self.count[i] = self.count[i].wrapping_sub(other.count[i]); - } + self.key.iter_mut().zip(other.key.iter()).for_each(|(a, b)| *a ^= *b); + self.check_hash.iter_mut().zip(other.check_hash.iter()).for_each(|(a, b)| *a ^= *b); + self.count.iter_mut().zip(other.count.iter()).for_each(|(a, b)| *a = a.wrapping_sub(*b)); } /// List as many entries in this IBLT as can be extracted. - pub fn list(mut self, mut f: F) { + /// True is returned if extraction was 100% successful. False indicates that + /// some entries were not extractable. + pub fn list(mut self, mut f: F) -> bool { let mut queue: Vec = Vec::with_capacity(BUCKETS); for i in 0..BUCKETS { - if self.is_singular(i) { + let count = self.count[i]; + if (count == 1 || count == -1) && get_check_hash(self.key[i]) == self.check_hash[i] { queue.push(i as u32); } } - loop { + 'list_main: loop { let i = queue.pop(); let i = if i.is_some() { i.unwrap() as usize } else { - break; + break 'list_main; }; - if self.is_singular(i) { - let key = self.key[i]; - + let key = self.key[i]; + let check_hash = self.check_hash[i]; + let count = self.count[i]; + if (count == 1 || count == -1) && check_hash == get_check_hash(key) { f(key); - let check_hash = xorshift64(key); let mut iteration_index = u64::from_le(key); - for _ in 0..Self::KEY_MAPPING_ITERATIONS { - iteration_index = next_iteration_index(iteration_index); + for k in 0..(HASHES as u64) { + iteration_index = next_iteration_index(iteration_index, k); let i = (iteration_index as usize) % BUCKETS; - self.key[i] ^= key; - self.check_hash[i] ^= check_hash; - self.count[i] = self.count[i].wrapping_sub(1); - if self.is_singular(i) { + let key2 = self.key[i] ^ key; + let check_hash2 = self.check_hash[i] ^ check_hash; + let count2 = self.count[i].wrapping_sub(count); + self.key[i] = key2; + self.check_hash[i] = check_hash2; + self.count[i] = count2; + if (count2 == 1 || count2 == -1) && check_hash2 == get_check_hash(key2) { if queue.len() > BUCKETS { // sanity check, should be impossible - return; + break 'list_main; } queue.push(i as u32); } } } } + + self.count.iter().all(|x| *x == 0) && self.key.iter().all(|x| *x == 0) } } #[cfg(test)] mod tests { use std::collections::HashSet; + #[allow(unused_imports)] use std::time::SystemTime; use crate::iblt::*; + #[allow(unused_imports)] + use crate::utils::{splitmix64, xorshift64}; - #[test] - fn splitmix_is_invertiblex() { - for i in 1..2000_u64 { - assert_eq!(i, splitmix64_inverse(splitmix64(i))) - } - } + const HASHES: usize = 3; #[test] fn fill_list_performance() { - let mut rn = xorshift64(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64); + const CAPACITY: usize = 4096; + //let mut rn = xorshift64(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64); + let mut rn = 31337; let mut expected: HashSet = HashSet::with_capacity(4096); let mut count = 64; - const CAPACITY: usize = 4096; while count <= CAPACITY { - let mut test = IBLT::::new(); + let mut test = IBLT::::new(); expected.clear(); for _ in 0..count { @@ -215,43 +245,50 @@ mod tests { assert!(expected.contains(&x)); }); - //println!("inserted: {}\tlisted: {}\tcapacity: {}\tscore: {:.4}\tfill: {:.4}", count, list_count, CAPACITY, (list_count as f64) / (count as f64), (count as f64) / (CAPACITY as f64)); + println!("inserted: {}\tlisted: {}\tcapacity: {}\tscore: {:.4}\tfill: {:.4}", count, list_count, CAPACITY, (list_count as f64) / (count as f64), (count as f64) / (CAPACITY as f64)); count += 64; } } #[test] fn merge_sets() { - let mut rn = xorshift64(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64); const CAPACITY: usize = 16384; - const REMOTE_SIZE: usize = 1024 * 1024; + const REMOTE_SIZE: usize = 1024 * 1024 * 2; const STEP: usize = 1024; + //let mut rn = xorshift64(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64); + let mut rn = 31337; let mut missing_count = 1024; - let mut missing: HashSet = HashSet::with_capacity(CAPACITY); + let mut missing: HashSet = HashSet::with_capacity(CAPACITY * 2); + let mut all: HashSet = HashSet::with_capacity(REMOTE_SIZE); while missing_count <= CAPACITY { missing.clear(); - let mut local = IBLT::::new(); - let mut remote = IBLT::::new(); + all.clear(); + let mut local = IBLT::::new(); + let mut remote = IBLT::::new(); - for k in 0..REMOTE_SIZE { - if k >= missing_count { - local.insert(rn); - } else { - missing.insert(rn); + let mut k = 0; + while k < REMOTE_SIZE { + if all.insert(rn) { + if k >= missing_count { + local.insert(rn); + } else { + missing.insert(rn); + } + remote.insert(rn); + k += 1; } - remote.insert(rn); rn = splitmix64(rn); } local.subtract(&mut remote); let bytes = local.as_bytes().len(); let mut cnt = 0; - local.list(|k| { + let all_success = local.list(|k| { assert!(missing.contains(&k)); cnt += 1; }); - println!("total: {} missing: {:5} recovered: {:5} size: {:5} score: {:.4} bytes/item: {:.2}", REMOTE_SIZE, missing.len(), cnt, bytes, (cnt as f64) / (missing.len() as f64), (bytes as f64) / (cnt as f64)); + println!("total: {} missing: {:5} recovered: {:5} size: {:5} score: {:.4} bytes/item: {:.2} extract(fill): {:.4} 100%: {}", REMOTE_SIZE, missing.len(), cnt, bytes, (cnt as f64) / (missing.len() as f64), (bytes as f64) / (cnt as f64), (cnt as f64) / (CAPACITY as f64), all_success); missing_count += STEP; } diff --git a/syncwhole/src/lib.rs b/syncwhole/src/lib.rs index 0da564aee..030bf48c3 100644 --- a/syncwhole/src/lib.rs +++ b/syncwhole/src/lib.rs @@ -6,11 +6,13 @@ * https://www.zerotier.com/ */ -pub(crate) mod varint; -pub(crate) mod protocol; pub(crate) mod iblt; +pub(crate) mod protocol; +pub(crate) mod varint; pub mod datastore; -pub mod node; pub mod host; +pub mod node; pub mod utils; + +pub use async_trait; diff --git a/syncwhole/src/main.rs b/syncwhole/src/main.rs index e8325a1e9..1344a4744 100644 --- a/syncwhole/src/main.rs +++ b/syncwhole/src/main.rs @@ -1,18 +1,27 @@ -extern crate core; +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2022 ZeroTier, Inc. + * https://www.zerotier.com/ + */ use std::collections::BTreeMap; use std::io::{stdout, Write}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::ops::Bound::Included; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; +use async_trait::async_trait; + use sha2::digest::Digest; use sha2::Sha512; -use syncwhole::datastore::{DataStore, LoadResult, StoreResult}; -use syncwhole::host::Host; -use syncwhole::node::{Node, RemoteNodeInfo}; +use syncwhole::datastore::*; +use syncwhole::host::*; +use syncwhole::node::*; use syncwhole::utils::*; const TEST_NODE_COUNT: usize = 8; @@ -36,16 +45,33 @@ fn get_random_bytes(mut buf: &mut [u8]) { unsafe { RANDOM_CTR = ctr }; } -struct TestNodeHost { - name: String, - peers: Vec, - db: Mutex>>, +pub struct TestNodeHost { + pub name: String, + pub config: Config, + pub records: tokio::sync::Mutex>, +} + +impl TestNodeHost { + pub fn new_random(test_no: usize) -> Self { + let mut s = BTreeMap::new(); + for _ in 0..TEST_STARTING_RECORDS_PER_NODE { + let mut v = [0_u8; 64]; + get_random_bytes(&mut v); + let k = Self::sha512(&[&v]); + s.insert(k, v); + } + Self { + name: test_no.to_string(), + config: Config::default(), + records: tokio::sync::Mutex::new(s), + } + } } impl Host for TestNodeHost { - fn fixed_peers(&self) -> &[SocketAddr] { self.peers.as_slice() } - - fn name(&self) -> Option<&str> { Some(self.name.as_str()) } + fn node_config(&self) -> Config { + self.config.clone() + } fn on_connect_attempt(&self, _address: &SocketAddr) { //println!("{:5}: connecting to {}", self.name, _address.to_string()); @@ -56,7 +82,7 @@ impl Host for TestNodeHost { } fn on_connection_closed(&self, info: &RemoteNodeInfo, reason: String) { - println!("{:5}: closed connection to {}: {} ({}, {})", self.name, info.remote_address.to_string(), reason, if info.inbound { "inbound" } else { "outbound" }, if info.initialized { "initialized" } else { "not initialized" }); + //println!("{:5}: closed connection to {}: {} ({}, {})", self.name, info.remote_address.to_string(), reason, if info.inbound { "inbound" } else { "outbound" }, if info.initialized { "initialized" } else { "not initialized" }); } fn get_secure_random(&self, buf: &mut [u8]) { @@ -65,47 +91,68 @@ impl Host for TestNodeHost { } } +#[async_trait] impl DataStore for TestNodeHost { - type LoadResultValueType = Arc<[u8]>; + type ValueRef = [u8; 64]; const MAX_VALUE_SIZE: usize = 1024; - fn clock(&self) -> i64 { ms_since_epoch() } - - fn domain(&self) -> &str { "test" } - - fn load(&self, _: i64, key: &[u8]) -> LoadResult { - self.db.lock().unwrap().get(key).map_or(LoadResult::NotFound, |r| LoadResult::Ok(r.clone())) + fn clock(&self) -> i64 { + ms_since_epoch() } - fn store(&self, key: &[u8], value: &[u8]) -> StoreResult { - assert_eq!(key.len(), 64); - let mut res = StoreResult::Ok(0); - self.db.lock().unwrap().entry(key.try_into().unwrap()).and_modify(|e| { - if e.as_ref().eq(value) { - res = StoreResult::Duplicate; - } else { - *e = Arc::from(value) + fn domain(&self) -> &str { + "test" + } + + async fn load(&self, _: i64, key: &[u8]) -> Option { + let key = key.try_into(); + if key.is_ok() { + let key: [u8; 64] = key.unwrap(); + let records = self.records.lock().await; + let value = records.get(&key); + if value.is_some() { + return Some(value.unwrap().clone()); } - }).or_insert_with(|| { - Arc::from(value) - }); - res + } + return None; } - fn count(&self, _: i64, key_range_start: &[u8], key_range_end: &[u8]) -> u64 { - let s: [u8; 64] = key_range_start.try_into().unwrap(); - let e: [u8; 64] = key_range_end.try_into().unwrap(); - self.db.lock().unwrap().range((Included(s), Included(e))).count() as u64 + async fn store(&self, key: &[u8], value: &[u8]) -> StoreResult { + let key = key.try_into(); + if key.is_ok() && value.len() == 64 { + let key: [u8; 64] = key.unwrap(); + let value: [u8; 64] = value.try_into().unwrap(); + if key == Self::sha512(&[&value]) { + if self.records.lock().await.insert(key, value).is_none() { + StoreResult::Ok + } else { + StoreResult::Duplicate + } + } else { + StoreResult::Rejected + } + } else { + StoreResult::Rejected + } } - fn total_count(&self) -> u64 { self.db.lock().unwrap().len() as u64 } + async fn count(&self, _: i64, key_range_start: &[u8], key_range_end: &[u8]) -> u64 { + let start: [u8; 64] = key_range_start.try_into().unwrap(); + let end: [u8; 64] = key_range_end.try_into().unwrap(); + self.records.lock().await.range((Included(start), Included(end))).count() as u64 + } - fn for_each bool>(&self, _: i64, key_range_start: &[u8], key_range_end: &[u8], mut f: F) { - let s: [u8; 64] = key_range_start.try_into().unwrap(); - let e: [u8; 64] = key_range_end.try_into().unwrap(); - for (k, v) in self.db.lock().unwrap().range((Included(s), Included(e))) { - if !f(k, v.as_ref()) { + async fn total_count(&self) -> u64 { + self.records.lock().await.len() as u64 + } + + async fn for_each bool>(&self, _reference_time: i64, key_range_start: &[u8], key_range_end: &[u8], mut f: F) { + let start: [u8; 64] = key_range_start.try_into().unwrap(); + let end: [u8; 64] = key_range_end.try_into().unwrap(); + let records = self.records.lock().await; + for (k, v) in records.range((Included(start), Included(end))) { + if !f(k, v) { break; } } @@ -126,11 +173,10 @@ fn main() { peers.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port2))); } } - let nh = Arc::new(TestNodeHost { - name: format!("{}", port), - peers, - db: Mutex::new(BTreeMap::new()) - }); + let mut th = TestNodeHost::new_random(port as usize); + th.config.anchors = peers; + th.config.name = port.to_string(); + let nh = Arc::new(th); //println!("Starting node on 127.0.0.1:{}...", port, nh.db.lock().unwrap().len()); nodes.push(Node::new(nh.clone(), nh.clone(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))).await.unwrap()); } @@ -152,20 +198,6 @@ fn main() { } } - println!("Populating maps with data to be synchronized between nodes..."); - let mut all_records = BTreeMap::new(); - for n in nodes.iter_mut() { - for _ in 0..TEST_STARTING_RECORDS_PER_NODE { - let mut k = [0_u8; 64]; - let mut v = [0_u8; 32]; - get_random_bytes(&mut k); - get_random_bytes(&mut v); - let v: Arc<[u8]> = Arc::from(v); - all_records.insert(k.clone(), v.clone()); - n.datastore().db.lock().unwrap().insert(k, v); - } - } - loop { tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/syncwhole/src/node.rs b/syncwhole/src/node.rs index a42894fd4..32f421b1d 100644 --- a/syncwhole/src/node.rs +++ b/syncwhole/src/node.rs @@ -29,11 +29,15 @@ use crate::protocol::*; use crate::utils::*; use crate::varint; +/// Period for running main housekeeping pass. +const HOUSEKEEPING_PERIOD: i64 = SYNC_STATUS_PERIOD; + /// Inactivity timeout for connections in milliseconds. const CONNECTION_TIMEOUT: i64 = SYNC_STATUS_PERIOD * 4; -/// How often to run the housekeeping task's loop in milliseconds. -const HOUSEKEEPING_INTERVAL: i64 = SYNC_STATUS_PERIOD; +/// Announce when we get records from peers if sync status estimate is more than this threshold. +/// This is used to stop us from spamming with HaveRecords while catching up. +const ANNOUNCE_IF_SYNCED_MORE_THAN: f64 = 0.95; /// Information about a remote node to which we are connected. #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -63,19 +67,6 @@ pub struct RemoteNodeInfo { pub initialized: bool, } -fn configure_tcp_socket(socket: &TcpSocket) -> std::io::Result<()> { - let _ = socket.set_linger(None); - if socket.set_reuseport(true).is_ok() { - Ok(()) - } else { - socket.set_reuseaddr(true) - } -} - -fn decode_msgpack<'a, T: Deserialize<'a>>(b: &'a [u8]) -> std::io::Result { - rmp_serde::from_slice(b).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, format!("invalid msgpack object: {}", e.to_string()))) -} - /// An instance of the syncwhole data set synchronization engine. /// /// This holds a number of async tasks that are terminated or aborted if this object @@ -83,6 +74,7 @@ fn decode_msgpack<'a, T: Deserialize<'a>>(b: &'a [u8]) -> std::io::Result { pub struct Node { internal: Arc>, housekeeping_task: JoinHandle<()>, + announce_task: JoinHandle<()>, listener_task: JoinHandle<()>, } @@ -102,11 +94,18 @@ impl Node { datastore: db.clone(), host: host.clone(), connections: Mutex::new(HashMap::with_capacity(64)), + announce_queue: Mutex::new(HashMap::with_capacity(256)), bind_address, starting_instant: Instant::now(), + sync_completeness_estimate: AtomicU64::new((0.0_f64).to_bits()), }); - Ok(Self { internal: internal.clone(), housekeeping_task: tokio::spawn(internal.clone().housekeeping_task_main()), listener_task: tokio::spawn(internal.listener_task_main(listener)) }) + Ok(Self { + internal: internal.clone(), + housekeeping_task: tokio::spawn(internal.clone().housekeeping_task_main()), + announce_task: tokio::spawn(internal.clone().announce_task_main()), + listener_task: tokio::spawn(internal.listener_task_main(listener)), + }) } #[inline(always)] @@ -119,31 +118,66 @@ impl Node { &self.internal.host } + /// Broadcast a new record to the world. + /// + /// This should be called when new records are added to the synchronized data store + /// that are created locally. If this isn't called it may take a while for normal + /// sync to pick up and propagate the record. + pub async fn broadcast_new_record(&self, key: &[u8], value: &[u8]) {} + + /// Attempt to connect to an explicitly specified TCP endpoint. pub async fn connect(&self, endpoint: &SocketAddr) -> std::io::Result { self.internal.clone().connect(endpoint, Instant::now().add(Duration::from_millis(CONNECTION_TIMEOUT as u64))).await } + /// Get open peer to peer connections. pub async fn list_connections(&self) -> Vec { let connections = self.internal.connections.lock().await; let mut cl: Vec = Vec::with_capacity(connections.len()); for (_, c) in connections.iter() { - cl.push(c.info.lock().await.clone()); + cl.push(c.info.lock().unwrap().clone()); } cl } + /// Get the number of open peer to peer connections. pub async fn connection_count(&self) -> usize { self.internal.connections.lock().await.len() } + + /// Get a value from 0.0 to 1.0 estimating how synchronized we are with the network. + /// + /// This is an inexact estimate since it's based on record counts and it's possible for + /// two nodes to have the same count but disjoint sets. It tends to be fairly good in + /// practice though unless you have been disconnected for a very long time. + pub async fn sync_completeness_estimate(&self) -> f64 { + f64::from_bits(self.internal.sync_completeness_estimate.load(Ordering::Relaxed)) + } } impl Drop for Node { fn drop(&mut self) { self.housekeeping_task.abort(); + self.announce_task.abort(); self.listener_task.abort(); } } +/********************************************************************************************************************/ + +fn configure_tcp_socket(socket: &TcpSocket) -> std::io::Result<()> { + let _ = socket.set_linger(None); + if socket.set_reuseport(true).is_ok() { + Ok(()) + } else { + socket.set_reuseaddr(true) + } +} + +fn decode_msgpack<'a, T: Deserialize<'a>>(b: &'a [u8]) -> std::io::Result { + rmp_serde::from_slice(b).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, format!("invalid msgpack object: {}", e.to_string()))) +} + pub struct NodeInternal { // Secret used to perform HMAC to detect and drop loopback connections to self. anti_loopback_secret: [u8; 64], @@ -155,11 +189,17 @@ pub struct NodeInternal { // Connections and their task join handles, by remote endpoint address. connections: Mutex>>, + // Records received since last announce and the endpoints that we know already have them. + announce_queue: Mutex>>, + // Local address to which this node is bound bind_address: SocketAddr, // Instant this node started. starting_instant: Instant, + + // Latest estimate of sync completeness. + sync_completeness_estimate: AtomicU64, } impl NodeInternal { @@ -167,30 +207,46 @@ impl NodeInternal { Instant::now().duration_since(self.starting_instant).as_millis() as i64 } - /// Loop that constantly runs in the background to do cleanup and service things. async fn housekeeping_task_main(self: Arc) { let mut tasks: Vec> = Vec::new(); + let mut counts: Vec = Vec::new(); let mut connected_to_addresses: HashSet = HashSet::new(); let mut sleep_until = Instant::now().add(Duration::from_millis(500)); loop { tokio::time::sleep_until(sleep_until).await; - sleep_until = sleep_until.add(Duration::from_millis(HOUSEKEEPING_INTERVAL as u64)); + sleep_until = sleep_until.add(Duration::from_millis(HOUSEKEEPING_PERIOD as u64)); tasks.clear(); + counts.clear(); connected_to_addresses.clear(); let now = self.ms_monotonic(); + // Drop dead connections, send SyncStatus, and populate counts for computing sync status. + let sync_status = Arc::new( + rmp_serde::encode::to_vec_named(&msg::SyncStatus { + record_count: self.datastore.total_count().await, + clock: self.datastore.clock() as u64, + }) + .unwrap(), + ); self.connections.lock().await.retain(|sa, c| { if !c.closed.load(Ordering::Relaxed) { + let cc = c.clone(); if (now - c.last_receive_time.load(Ordering::Relaxed)) < CONNECTION_TIMEOUT { connected_to_addresses.insert(sa.clone()); + if c.info.lock().unwrap().initialized { + counts.push(c.last_sync_status_record_count.load(Ordering::Relaxed)); + let ss2 = sync_status.clone(); + tasks.push(tokio::spawn(async move { + let _ = tokio::time::timeout_at(sleep_until, cc.send_msg(MessageType::SyncStatus, ss2.as_slice(), now)).await; + })); + } true // keep connection } else { let _ = c.read_task.lock().unwrap().take().map(|j| j.abort()); let host = self.host.clone(); - let cc = c.clone(); tasks.push(tokio::spawn(async move { - host.on_connection_closed(&*cc.info.lock().await, "timeout".to_string()); + host.on_connection_closed(&*cc.info.lock().unwrap(), "timeout".to_string()); })); false // discard connection } @@ -203,18 +259,31 @@ impl NodeInternal { let e = j.unwrap().await; if e.is_ok() { let e = e.unwrap(); - host.on_connection_closed(&*cc.info.lock().await, e.map_or_else(|e| e.to_string(), |_| "unknown error".to_string())); + host.on_connection_closed(&*cc.info.lock().unwrap(), e.map_or_else(|e| e.to_string(), |_| "unknown error".to_string())); } else { - host.on_connection_closed(&*cc.info.lock().await, "remote host closed connection".to_string()); + host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string()); } } else { - host.on_connection_closed(&*cc.info.lock().await, "remote host closed connection".to_string()); + host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string()); } })); false // discard connection } }); + let sync_completness_estimate = if !counts.is_empty() { + counts.sort_unstable(); + let twothirds = if counts.len() > 3 { *counts.get((counts.len() / 3) * 2).unwrap() } else { *counts.last().unwrap() }; + if twothirds > 0 { + ((self.datastore.total_count().await as f64) / (twothirds as f64)).min(1.0) + } else { + 1.0 + } + } else { + 1.0 + }; + self.sync_completeness_estimate.store(sync_completness_estimate.to_bits(), Ordering::Relaxed); + let config = self.host.node_config(); // Always try to connect to anchor peers. @@ -257,7 +326,42 @@ impl NodeInternal { } } - /// Incoming TCP acceptor task. + async fn announce_task_main(self: Arc) { + let mut sleep_until = Instant::now().add(Duration::from_millis(ANNOUNCE_PERIOD as u64)); + let mut to_announce: Vec<([u8; ANNOUNCE_KEY_LEN], Vec)> = Vec::with_capacity(256); + let background_tasks = AsyncTaskReaper::new(); + let announce_timeout = Duration::from_millis(CONNECTION_TIMEOUT as u64); + loop { + tokio::time::sleep_until(sleep_until).await; + sleep_until = sleep_until.add(Duration::from_millis(ANNOUNCE_PERIOD as u64)); + + for (key, already_has) in self.announce_queue.lock().await.drain() { + to_announce.push((key, already_has)); + } + + let now = self.ms_monotonic(); + for c in self.connections.lock().await.iter() { + let mut have_records: Vec = Vec::with_capacity((to_announce.len() * ANNOUNCE_KEY_LEN) + 4); + have_records.push(ANNOUNCE_KEY_LEN as u8); + for (key, already_has) in to_announce.iter() { + if !already_has.contains(c.0) { + let _ = std::io::Write::write_all(&mut have_records, key); + } + } + if have_records.len() > 1 { + let c2 = c.1.clone(); + background_tasks.spawn(async move { + // If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to + // write buffering but a short timeout prevents them from building up too much. + let _ = tokio::time::timeout(announce_timeout, c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now)); + }) + } + } + + to_announce.clear(); + } + } + async fn listener_task_main(self: Arc, listener: TcpListener) { loop { let socket = listener.accept().await; @@ -273,7 +377,6 @@ impl NodeInternal { } } - /// Initiate an outgoing connection with a deadline based timeout. async fn connect(self: Arc, address: &SocketAddr, deadline: Instant) -> std::io::Result { self.host.on_connect_attempt(address); let stream = if address.is_ipv4() { TcpSocket::new_v4() } else { TcpSocket::new_v6() }?; @@ -287,28 +390,35 @@ impl NodeInternal { } } - /// Sets up and spawns the task for a new TCP connection whether inbound or outbound. async fn connection_start(self: &Arc, address: SocketAddr, stream: TcpStream, inbound: bool) -> bool { let mut ok = false; let _ = self.connections.lock().await.entry(address.clone()).or_insert_with(|| { ok = true; - //let _ = stream.set_nodelay(true); + let _ = stream.set_nodelay(false); let (reader, writer) = stream.into_split(); let now = self.ms_monotonic(); let connection = Arc::new(Connection { writer: Mutex::new(writer), last_send_time: AtomicI64::new(now), last_receive_time: AtomicI64::new(now), - bytes_sent: AtomicU64::new(0), - bytes_received: AtomicU64::new(0), - info: Mutex::new(RemoteNodeInfo { name: String::new(), contact: String::new(), remote_address: address.clone(), explicit_addresses: Vec::new(), connect_time: ms_since_epoch(), connect_instant: now, inbound, initialized: false }), + last_sync_status_record_count: AtomicU64::new(0), + info: std::sync::Mutex::new(RemoteNodeInfo { + name: String::new(), + contact: String::new(), + remote_address: address.clone(), + explicit_addresses: Vec::new(), + connect_time: ms_since_epoch(), + connect_instant: now, + inbound, + initialized: false, + }), read_task: std::sync::Mutex::new(None), closed: AtomicBool::new(false), }); let self2 = self.clone(); let c2 = connection.clone(); connection.read_task.lock().unwrap().replace(tokio::spawn(async move { - let result = self2.connection_io_task_main(&c2, reader).await; + let result = self2.connection_io_task_main(&c2, address, reader).await; c2.closed.store(true, Ordering::Relaxed); result })); @@ -317,11 +427,7 @@ impl NodeInternal { ok } - /// Main I/O task launched for each connection. - /// - /// This handles reading from the connection and reacting to what it sends. Killing this - /// task is done when the connection is closed. - async fn connection_io_task_main(self: Arc, connection: &Arc, mut reader: OwnedReadHalf) -> std::io::Result<()> { + async fn connection_io_task_main(self: Arc, connection: &Arc, remote_address: SocketAddr, mut reader: OwnedReadHalf) -> std::io::Result<()> { const BUF_CHUNK_SIZE: usize = 4096; const READ_BUF_INITIAL_SIZE: usize = 65536; // should be a multiple of BUF_CHUNK_SIZE @@ -392,7 +498,7 @@ impl NodeInternal { } } } - let message = &read_buffer.as_slice()[header_size..total_size]; + let mut message = &read_buffer.as_slice()[header_size..total_size]; let now = self.ms_monotonic(); connection.last_receive_time.store(now, Ordering::Relaxed); @@ -408,7 +514,7 @@ impl NodeInternal { let msg: msg::Init = decode_msgpack(message)?; let (anti_loopback_response, domain_challenge_response, auth_challenge_response) = { - let mut info = connection.info.lock().await; + let mut info = connection.info.lock().unwrap(); info.name = msg.node_name.to_string(); info.contact = msg.node_contact.to_string(); let _ = msg.explicit_ipv4.map(|pv4| { @@ -417,22 +523,35 @@ impl NodeInternal { let _ = msg.explicit_ipv6.map(|pv6| { info.explicit_addresses.push(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(pv6.ip), pv6.port, 0, 0))); }); + let info = info.clone(); let auth_challenge_response = self.host.authenticate(&info, msg.auth_challenge); if auth_challenge_response.is_none() { return Err(std::io::Error::new(std::io::ErrorKind::Other, "authenticate() returned None, connection dropped")); } + let auth_challenge_response = auth_challenge_response.unwrap(); - (H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge), H::hmac_sha512(&H::sha512(&[self.datastore.domain().as_bytes()]), msg.domain_challenge), auth_challenge_response.unwrap()) + (H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge), H::hmac_sha512(&H::sha512(&[self.datastore.domain().as_bytes()]), msg.domain_challenge), auth_challenge_response) }; - connection.send_obj(&mut write_buffer, MessageType::InitResponse, &msg::InitResponse { anti_loopback_response: &anti_loopback_response, domain_response: &domain_challenge_response, auth_response: &auth_challenge_response }, now).await?; + connection + .send_obj( + &mut write_buffer, + MessageType::InitResponse, + &msg::InitResponse { + anti_loopback_response: &anti_loopback_response, + domain_response: &domain_challenge_response, + auth_response: &auth_challenge_response, + }, + now, + ) + .await?; } MessageType::InitResponse => { let msg: msg::InitResponse = decode_msgpack(message)?; + let mut info = connection.info.lock().unwrap(); - let mut info = connection.info.lock().await; if info.initialized { return Err(std::io::Error::new(std::io::ErrorKind::Other, "duplicate init response")); } @@ -447,10 +566,11 @@ impl NodeInternal { return Err(std::io::Error::new(std::io::ErrorKind::Other, "challenge/response authentication failed")); } - info.initialized = true; initialized = true; - let info = info.clone(); + info.initialized = true; + + let info = info.clone(); // also releases lock since info is replaced/destroyed self.host.on_connect(&info); } @@ -461,35 +581,103 @@ impl NodeInternal { } match message_type { - _ => {} - MessageType::HaveRecords => { - let msg: msg::HaveRecords = decode_msgpack(message)?; + if message.len() > 1 { + let clock = self.datastore.clock(); + let mut announce_queue_key = [0_u8; ANNOUNCE_KEY_LEN]; + let mut start = [0_u8; KEY_SIZE]; + let mut end = [0xff_u8; KEY_SIZE]; + let key_prefix_len = message[0] as usize; + message = &message[1..]; + if key_prefix_len > 0 && key_prefix_len <= KEY_SIZE { + write_buffer.clear(); + write_buffer.push(key_prefix_len as u8); + while message.len() >= key_prefix_len { + let key_prefix = &message[..key_prefix_len]; + + if key_prefix_len >= ANNOUNCE_KEY_LEN { + // If the key prefix is appropriately sized, look up and add this remote endpoint + // to the list of endpoints that already have this record if it's in the announce + // queue. We don't add a new entry to the announce queue if one doesn't already + // exist because we did not just receive the actual record. This just avoids announcing + // to peers that just told us they have it. + announce_queue_key.copy_from_slice(&key_prefix[..ANNOUNCE_KEY_LEN]); + self.announce_queue.lock().await.get_mut(&announce_queue_key).map(|already_has| { + if !already_has.contains(&remote_address) { + already_has.push(remote_address.clone()); + } + }); + } + + if if key_prefix_len < KEY_SIZE { + (&mut start[..key_prefix_len]).copy_from_slice(key_prefix); + (&mut end[..key_prefix_len]).copy_from_slice(key_prefix); + self.datastore.count(clock, &start, &end).await == 0 + } else { + !self.datastore.contains(clock, key_prefix).await + } { + let _ = std::io::Write::write_all(&mut write_buffer, key_prefix); + } + + message = &message[key_prefix_len..]; + } + if write_buffer.len() > 1 { + let _ = connection.send_msg(MessageType::GetRecords, write_buffer.as_slice(), now).await?; + } + } + } } MessageType::GetRecords => { - let msg: msg::GetRecords = decode_msgpack(message)?; + if message.len() > 1 { + let mut start = [0_u8; KEY_SIZE]; + let mut end = [0xff_u8; KEY_SIZE]; + let key_prefix_len = message[0] as usize; + message = &message[1..]; + if key_prefix_len > 0 && key_prefix_len <= KEY_SIZE { + while message.len() >= key_prefix_len { + let key_prefix = &message[..key_prefix_len]; + + if key_prefix_len < KEY_SIZE { + (&mut start[..key_prefix_len]).copy_from_slice(key_prefix); + (&mut end[..key_prefix_len]).copy_from_slice(key_prefix); + self.datastore + .for_each(0, &start, &end, |_, v| { + let v2 = v.clone(); + let c2 = connection.clone(); + background_tasks.spawn(async move { + let _ = c2.send_msg(MessageType::Record, v2.as_ref(), now).await; + }); + true + }) + .await; + } else { + let record = self.datastore.load(0, key_prefix).await; + if record.is_some() { + let record = record.unwrap(); + let v: &[u8] = record.as_ref(); + let _ = connection.send_msg(MessageType::Record, v, now).await?; + } + } + + message = &message[key_prefix_len..]; + } + } + } } MessageType::Record => { let key = H::sha512(&[message]); - match self.datastore.store(&key, message) { + match self.datastore.store(&key, message).await { StoreResult::Ok => { - // TODO: probably should not announce if way out of sync - let connections = self.connections.lock().await; - let mut announce_to: Vec> = Vec::with_capacity(connections.len()); - for (_, c) in connections.iter() { - if !Arc::ptr_eq(&connection, c) { - announce_to.push(c.clone()); + if f64::from_bits(self.sync_completeness_estimate.load(Ordering::Relaxed)) >= ANNOUNCE_IF_SYNCED_MORE_THAN { + let announce_key: [u8; ANNOUNCE_KEY_LEN] = (&key[..ANNOUNCE_KEY_LEN]).try_into().unwrap(); + let mut q = self.announce_queue.lock().await; + let ql = q.entry(announce_key).or_insert_with(|| Vec::with_capacity(2)); + if !ql.contains(&remote_address) { + ql.push(remote_address.clone()); } } - drop(connections); // release lock - - background_tasks.spawn(async move { - for c in announce_to.iter() { - let _ = c.send_msg(MessageType::HaveRecord, &key[0..ANNOUNCE_KEY_LEN], now).await; - } - }); } StoreResult::Rejected => { return Err(std::io::Error::new(std::io::ErrorKind::Other, format!("record rejected by data store: {}", to_hex_string(&key)))); @@ -500,6 +688,7 @@ impl NodeInternal { MessageType::SyncStatus => { let msg: msg::SyncStatus = decode_msgpack(message)?; + connection.last_sync_status_record_count.store(msg.record_count, Ordering::Relaxed); } MessageType::SyncRequest => { @@ -509,14 +698,14 @@ impl NodeInternal { MessageType::SyncResponse => { let msg: msg::SyncResponse = decode_msgpack(message)?; } + + _ => {} } } } read_buffer.copy_within(total_size..buffer_fill, 0); buffer_fill -= total_size; - - connection.bytes_received.fetch_add(total_size as u64, Ordering::Relaxed); } } } @@ -544,9 +733,8 @@ struct Connection { writer: Mutex, last_send_time: AtomicI64, last_receive_time: AtomicI64, - bytes_sent: AtomicU64, - bytes_received: AtomicU64, - info: Mutex, + last_sync_status_record_count: AtomicU64, + info: std::sync::Mutex, read_task: std::sync::Mutex>>>, closed: AtomicBool, } @@ -558,7 +746,6 @@ impl Connection { let header_size = 1 + varint::encode(&mut header[1..], data.len() as u64); if self.writer.lock().await.write_vectored(&[IoSlice::new(&header[0..header_size]), IoSlice::new(data)]).await? == (data.len() + header_size) { self.last_send_time.store(now, Ordering::Relaxed); - self.bytes_sent.fetch_add((header_size + data.len()) as u64, Ordering::Relaxed); Ok(()) } else { Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "write error")) diff --git a/syncwhole/src/protocol.rs b/syncwhole/src/protocol.rs index 355c99e5a..7b65ff4d9 100644 --- a/syncwhole/src/protocol.rs +++ b/syncwhole/src/protocol.rs @@ -12,19 +12,38 @@ pub const ANNOUNCE_KEY_LEN: usize = 24; /// Send SyncStatus this frequently, in milliseconds. pub const SYNC_STATUS_PERIOD: i64 = 5000; +/// Check for and announce that we "have" records this often in milliseconds. +pub const ANNOUNCE_PERIOD: i64 = 100; + #[derive(Clone, Copy, Eq, PartialEq)] #[repr(u8)] pub enum MessageType { + /// No operation, payload ignored. Nop = 0_u8, + + /// msg::Init (msgpack) Init = 1_u8, + + /// msg::InitResponse (msgpack) InitResponse = 2_u8, - HaveRecord = 3_u8, - HaveRecords = 4_u8, - GetRecords = 5_u8, - Record = 6_u8, - SyncStatus = 7_u8, - SyncRequest = 8_u8, - SyncResponse = 9_u8, + + /// [...] + HaveRecords = 3_u8, + + /// [...] + GetRecords = 4_u8, + + /// + Record = 5_u8, + + /// msg::SyncStatus (msgpack) + SyncStatus = 6_u8, + + /// msg::SyncRequest (msgpack) + SyncRequest = 7_u8, + + /// msg::SyncResponse (msgpack) + SyncResponse = 8_u8, } impl From for MessageType { @@ -40,12 +59,12 @@ impl From for MessageType { } impl MessageType { + #[allow(unused)] pub fn name(&self) -> &'static str { match *self { Self::Nop => "NOP", Self::Init => "INIT", Self::InitResponse => "INIT_RESPONSE", - Self::HaveRecord => "HAVE_RECORD", Self::HaveRecords => "HAVE_RECORDS", Self::GetRecords => "GET_RECORDS", Self::Record => "RECORD", @@ -56,45 +75,8 @@ impl MessageType { } } -#[derive(Clone, Copy, Eq, PartialEq)] -#[repr(u8)] -pub enum SyncResponseType { - /// No response, do nothing. - None = 0_u8, - - /// Response is a msgpack-encoded HaveRecords message. - HaveRecords = 1_u8, - - /// Response is a series of records prefixed by varint record sizes. - Records = 2_u8, - - /// Response is an IBLT set summary. - IBLT = 3_u8, -} - -impl From for SyncResponseType { - /// Get response type from a byte, returning None if the byte is out of range. - #[inline(always)] - fn from(b: u8) -> Self { - if b <= 3 { - unsafe { std::mem::transmute(b) } - } else { - Self::None - } - } -} - -impl SyncResponseType { - pub fn as_str(&self) -> &'static str { - match *self { - SyncResponseType::None => "NONE", - SyncResponseType::HaveRecords => "HAVE_RECORDS", - SyncResponseType::Records => "RECORDS", - SyncResponseType::IBLT => "IBLT", - } - } -} - +/// Msgpack serializable message types. +/// Some that are frequently transferred use shortened names to save bandwidth. pub mod msg { use serde::{Deserialize, Serialize}; @@ -158,30 +140,6 @@ pub mod msg { pub auth_response: &'a [u8], } - #[derive(Serialize, Deserialize)] - pub struct HaveRecords<'a> { - /// Length of each key, chosen to ensure uniqueness. - #[serde(rename = "l")] - pub key_length: usize, - - /// Keys whose existence is being announced, of 'key_length' length. - #[serde(with = "serde_bytes")] - #[serde(rename = "k")] - pub keys: &'a [u8], - } - - #[derive(Serialize, Deserialize)] - pub struct GetRecords<'a> { - /// Length of each key, chosen to ensure uniqueness. - #[serde(rename = "l")] - pub key_length: usize, - - /// Keys to retrieve, of 'key_length' bytes in length. - #[serde(with = "serde_bytes")] - #[serde(rename = "k")] - pub keys: &'a [u8], - } - #[derive(Serialize, Deserialize)] pub struct SyncStatus { /// Total number of records this node has in its data store. @@ -195,58 +153,69 @@ pub mod msg { #[derive(Serialize, Deserialize)] pub struct SyncRequest<'a> { - /// Query mask, a random string of KEY_SIZE bytes. + /// Key range start (length: KEY_SIZE) #[serde(with = "serde_bytes")] - #[serde(rename = "q")] - pub query_mask: &'a [u8], + #[serde(rename = "s")] + pub range_start: &'a [u8], - /// Number of bits to match as a prefix in query_mask (0 for entire data set). - #[serde(rename = "b")] - pub query_mask_bits: u8, + /// Key range end (length: KEY_SIZE) + #[serde(with = "serde_bytes")] + #[serde(rename = "e")] + pub range_end: &'a [u8], - /// Number of records requesting node already holds under query mask prefix. + /// Number of records requesting node already has under key range #[serde(rename = "c")] pub record_count: u64, - /// Sender's reference time. + /// Reference time for query #[serde(rename = "t")] pub reference_time: u64, /// Random salt - #[serde(rename = "s")] - pub salt: u64, + #[serde(rename = "x")] + pub salt: &'a [u8], } #[derive(Serialize, Deserialize)] pub struct SyncResponse<'a> { - /// Query mask, a random string of KEY_SIZE bytes. - #[serde(with = "serde_bytes")] - #[serde(rename = "q")] - pub query_mask: &'a [u8], + /// Key range start (length: KEY_SIZE) + #[serde(rename = "s")] + pub range_start: &'a [u8], - /// Number of bits to match as a prefix in query_mask (0 for entire data set). - #[serde(rename = "b")] - pub query_mask_bits: u8, + /// Key range end (length: KEY_SIZE) + #[serde(rename = "e")] + pub range_end: &'a [u8], - /// Number of records sender has under this prefix. + /// Number of records responder has under key range #[serde(rename = "c")] pub record_count: u64, - /// Sender's reference time. + /// Reference time for query #[serde(rename = "t")] pub reference_time: u64, /// Random salt - #[serde(rename = "s")] - pub salt: u64, + #[serde(rename = "x")] + pub salt: &'a [u8], - /// SyncResponseType determining content of 'data'. - #[serde(rename = "r")] - pub response_type: u8, - - /// Data whose meaning depends on the response type. + /// IBLT set summary or empty if not included + /// + /// If an IBLT is omitted it means the sender determined it was + /// more efficient to just send keys. In that case keys[] should have + /// an explicit list. #[serde(with = "serde_bytes")] - #[serde(rename = "d")] - pub data: &'a [u8], + #[serde(rename = "i")] + pub iblt: &'a [u8], + + /// Explicit list of keys (full key length). + /// + /// This may still contain keys if an IBLT is present. In that case + /// keys included here will be any that have identical 64-bit prefixes + /// to keys already added to the IBLT and thus would collide. These + /// should be rare so it's most efficient to just explicitly name them. + /// Otherwise keys with identical 64-bit prefixes may never be synced. + #[serde(with = "serde_bytes")] + #[serde(rename = "k")] + pub keys: &'a [u8], } } diff --git a/syncwhole/src/utils.rs b/syncwhole/src/utils.rs index 0a9b789f6..6b4eb4cac 100644 --- a/syncwhole/src/utils.rs +++ b/syncwhole/src/utils.rs @@ -34,33 +34,30 @@ pub fn to_hex_string(b: &[u8]) -> String { #[inline(always)] pub fn xorshift64(mut x: u64) -> u64 { - x = u64::from_le(x); x ^= x.wrapping_shl(13); x ^= x.wrapping_shr(7); x ^= x.wrapping_shl(17); - x.to_le() + x } #[inline(always)] pub fn splitmix64(mut x: u64) -> u64 { - x = u64::from_le(x); x ^= x.wrapping_shr(30); x = x.wrapping_mul(0xbf58476d1ce4e5b9); x ^= x.wrapping_shr(27); x = x.wrapping_mul(0x94d049bb133111eb); x ^= x.wrapping_shr(31); - x.to_le() + x } #[inline(always)] pub fn splitmix64_inverse(mut x: u64) -> u64 { - x = u64::from_le(x); x ^= x.wrapping_shr(31) ^ x.wrapping_shr(62); x = x.wrapping_mul(0x319642b2d24d8ec3); x ^= x.wrapping_shr(27) ^ x.wrapping_shr(54); x = x.wrapping_mul(0x96de1b173f119089); x ^= x.wrapping_shr(30) ^ x.wrapping_shr(60); - x.to_le() + x } static mut RANDOM_STATE_0: u64 = 0; @@ -73,7 +70,7 @@ pub fn random() -> u64 { s0 = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64; } if s1 == 0 { - s1 = splitmix64(std::process::id() as u64); + s1 = splitmix64((std::process::id() as u64).wrapping_add((unsafe { &RANDOM_STATE_0 } as *const u64) as u64)); } let s1_new = xorshift64(s1); s0 = splitmix64(s0.wrapping_add(s1)); @@ -93,7 +90,10 @@ pub struct AsyncTaskReaper { impl AsyncTaskReaper { pub fn new() -> Self { - Self { ctr: AtomicUsize::new(0), handles: Arc::new(std::sync::Mutex::new(HashMap::new())) } + Self { + ctr: AtomicUsize::new(0), + handles: Arc::new(std::sync::Mutex::new(HashMap::new())), + } } /// Spawn a new task. diff --git a/syncwhole/src/varint.rs b/syncwhole/src/varint.rs index 1cb21524c..06c040e7f 100644 --- a/syncwhole/src/varint.rs +++ b/syncwhole/src/varint.rs @@ -6,6 +6,7 @@ * https://www.zerotier.com/ */ +#[allow(unused)] pub const VARINT_MAX_SIZE_BYTES: usize = 10; pub fn encode(b: &mut [u8], mut v: u64) -> usize {