diff --git a/iblt/Cargo.toml b/iblt/Cargo.toml index 09c17d5ff..82cce58ff 100644 --- a/iblt/Cargo.toml +++ b/iblt/Cargo.toml @@ -12,5 +12,6 @@ codegen-units = 1 panic = 'abort' [dependencies] +crc32fast = "^1" [lib] diff --git a/iblt/src/lib.rs b/iblt/src/lib.rs index 4275a999e..dadcf495d 100644 --- a/iblt/src/lib.rs +++ b/iblt/src/lib.rs @@ -8,29 +8,39 @@ use std::borrow::Cow; -/// Total memory overhead of each bucket in bytes. -const BUCKET_SIZE_BYTES: usize = 13; // u64 key + u32 check + i8 count - -/// Based on xorshift64 with endian conversion for BE systems. +#[cfg(not(any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64", target_arch = "powerpc64")))] #[inline(always)] -fn get_check_hash(mut x: u64) -> u32 { - x = u64::from_le(x); - x ^= x.wrapping_shl(13); - x ^= x.wrapping_shr(7); - x ^= x.wrapping_shl(17); - (x.wrapping_add(x.wrapping_shr(32)) as u32).to_le() +fn xor_with(x: &mut [u8; L], y: &[u8; L]) { + x.iter_mut().zip(y.iter()).for_each(|(a, b)| *a ^= *b); } -/// Called to get the next iteration index for each KEY_MAPPING_ITERATIONS table lookup. -/// A series of these implements the "series of different hashes" construct in IBLT. +#[cfg(any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64", target_arch = "powerpc64"))] #[inline(always)] -fn next_iteration_index(mut x: u64, hash_no: u64) -> u64 { - x = x.wrapping_add(hash_no); - x ^= x.wrapping_shr(30); - x = x.wrapping_mul(0xbf58476d1ce4e5b9); - x ^= x.wrapping_shr(27); - x = x.wrapping_mul(0x94d049bb133111eb); - x ^= x.wrapping_shr(31); +fn xor_with(x: &mut [u8; L], y: &[u8; L]) { + if L >= 16 { + for i in 0..(L / 16) { + unsafe { *x.as_mut_ptr().cast::().add(i) ^= *y.as_ptr().cast::() }; + } + for i in (L - (L % 16))..L { + unsafe { *x.as_mut_ptr().cast::().add(i) ^= *y.as_ptr().cast::() }; + } + } else { + for i in 0..(L / 8) { + unsafe { *x.as_mut_ptr().cast::().add(i) ^= *y.as_ptr().cast::() }; + } + for i in (L - (L % 8))..L { + unsafe { *x.as_mut_ptr().cast::().add(i) ^= *y.as_ptr().cast::() }; + } + } +} + +#[inline(always)] +fn murmurhash32_mix32(mut x: u32) -> u32 { + x ^= x.wrapping_shr(16); + x = x.wrapping_mul(0x85ebca6b); + x ^= x.wrapping_shr(13); + x = x.wrapping_mul(0xc2b2ae35); + x ^= x.wrapping_shr(16); x } @@ -40,28 +50,26 @@ fn next_iteration_index(mut x: u64, hash_no: u64) -> u64 { /// /// https://dash.harvard.edu/bitstream/handle/1/14398536/GENTILI-SENIORTHESIS-2015.pdf /// -/// Note that an 8-bit counter that wraps is used instead of a much wider counter that -/// would probably never overflow. This saves space on the wire but means that there is -/// a very small (roughly 1/2^64) chance that this will list a value that is invalid in that -/// it was never added on either side. In our protocol that should not cause a problem as -/// it would just result in one key in a GetRecords query not being fulfilled, and in any -/// case it should be an extremely rare event. -/// /// BUCKETS is the maximum capacity in buckets, while HASHES is the number of -/// "different" (differently seeded) hash functions to use. +/// "different" (differently seeded) hash functions to use. ITEM_BYTES is the size of +/// each set item in bytes. /// -/// The best value for HASHES seems to be 3 for an optimal fill of 80%. +/// NOTE: due to the small check hash and count used in this implementation, there is a +/// very small (less than one in a billion) chance of a spurious bogus result during list(). +/// Most sync protocols should be tolerant of this, but be sure that it's okay if this +/// occurs as it may occur at scale. +/// +/// The best value for HASHES seems to be 3 for an optimal fill of 75%. #[repr(C)] -pub struct IBLT { - key: [u64; BUCKETS], +pub struct IBLT { check_hash: [u32; BUCKETS], count: [i8; BUCKETS], + key: [[u8; ITEM_BYTES]; BUCKETS], } -impl Clone for IBLT { +impl Clone for IBLT { #[inline(always)] fn clone(&self) -> Self { - // NOTE: clone() is manually implemented here so it's tolerant of unaligned access on architectures not supporting it. unsafe { let mut tmp: Self = std::mem::MaybeUninit::uninit().assume_init(); std::ptr::copy_nonoverlapping((self as *const Self).cast::(), (&mut tmp as *mut Self).cast::(), Self::SIZE_BYTES); @@ -70,13 +78,16 @@ impl Clone for IBLT } } -impl IBLT { +impl IBLT { + /// Number of bytes each bucket consumes (not congituously, but doesn't matter). + const BUCKET_SIZE_BYTES: usize = ITEM_BYTES + 4 + 1; + /// Number of buckets in this IBLT. #[allow(unused)] pub const BUCKETS: usize = BUCKETS; /// Size of this IBLT in bytes. - pub const SIZE_BYTES: usize = BUCKETS * BUCKET_SIZE_BYTES; + pub const SIZE_BYTES: usize = BUCKETS * Self::BUCKET_SIZE_BYTES; /// Create a new zeroed IBLT. #[inline(always)] @@ -87,6 +98,7 @@ impl IBLT { } /// Get this IBLT as a byte slice (free cast operation). + /// The returned slice is always SIZE_BYTES in length. #[inline(always)] pub fn as_bytes(&self) -> &[u8] { unsafe { &*std::ptr::slice_from_raw_parts((self as *const Self).cast::(), Self::SIZE_BYTES) } @@ -121,40 +133,42 @@ impl IBLT { /// Zero this IBLT. #[inline(always)] pub fn reset(&mut self) { - unsafe { - std::ptr::write_bytes((self as *mut Self).cast::(), 0, std::mem::size_of::()); - } + unsafe { std::ptr::write_bytes((self as *mut Self).cast::(), 0, std::mem::size_of::()) }; } - pub(crate) fn ins_rem(&mut self, key: u64, delta: i8) { - let check_hash = get_check_hash(key); - let mut iteration_index = u64::from_le(key); - for k in 0..(HASHES as u64) { - iteration_index = next_iteration_index(iteration_index, k); + pub(crate) fn ins_rem(&mut self, key: &[u8; ITEM_BYTES], delta: i8) { + let check_hash = crc32fast::hash(key); + let mut iteration_index = u32::from_le(check_hash).wrapping_add(1); + for _ in 0..(HASHES as u64) { + iteration_index = murmurhash32_mix32(iteration_index); let i = (iteration_index as usize) % BUCKETS; - self.key[i] ^= key; self.check_hash[i] ^= check_hash; self.count[i] = self.count[i].wrapping_add(delta); + xor_with(&mut self.key[i], key); } } - /// Insert a 64-bit key. + /// Insert a set item into this set. + /// This will panic if the slice is smaller than ITEM_BYTES. #[inline(always)] - pub fn insert(&mut self, key: u64) { - self.ins_rem(key, 1); + pub fn insert(&mut self, key: &[u8]) { + assert!(key.len() >= ITEM_BYTES); + self.ins_rem(unsafe { &*key.as_ptr().cast() }, 1); } - /// Remove a 64-bit key. + /// Insert a set item into this set. + /// This will panic if the slice is smaller than ITEM_BYTES. #[inline(always)] - pub fn remove(&mut self, key: u64) { - self.ins_rem(key, -1); + pub fn remove(&mut self, key: &[u8]) { + assert!(key.len() >= ITEM_BYTES); + self.ins_rem(unsafe { &*key.as_ptr().cast() }, -1); } /// Subtract another IBLT from this one to get a set difference. pub fn subtract(&mut self, other: &Self) { - self.key.iter_mut().zip(other.key.iter()).for_each(|(a, b)| *a ^= *b); self.check_hash.iter_mut().zip(other.check_hash.iter()).for_each(|(a, b)| *a ^= *b); self.count.iter_mut().zip(other.count.iter()).for_each(|(a, b)| *a = a.wrapping_sub(*b)); + self.key.iter_mut().zip(other.key.iter()).for_each(|(a, b)| xor_with(a, b)); } /// List as many entries in this IBLT as can be extracted. @@ -165,12 +179,12 @@ impl IBLT { /// Due to the small check hash sizes used in this IBLT there is a very small chance this will list /// bogus items that were never added. This is not an issue with this protocol as it would just result /// in an unsatisfied record request. - pub fn list(mut self, mut f: F) -> bool { + pub fn list(mut self, mut f: F) -> bool { let mut queue: Vec = Vec::with_capacity(BUCKETS); for i in 0..BUCKETS { let count = self.count[i]; - if (count == 1 || count == -1) && get_check_hash(self.key[i]) == self.check_hash[i] { + if (count == 1 || count == -1) && crc32fast::hash(&self.key[i]) == self.check_hash[i] { queue.push(i as u32); } } @@ -183,49 +197,41 @@ impl IBLT { break 'list_main; }; - let key = self.key[i]; let check_hash = self.check_hash[i]; let count = self.count[i]; - if (count == 1 || count == -1) && check_hash == get_check_hash(key) { - f(key); + let key = &self.key[i]; + if (count == 1 || count == -1) && check_hash == crc32fast::hash(key) { + let key = key.clone(); - let mut iteration_index = u64::from_le(key); - for k in 0..(HASHES as u64) { - iteration_index = next_iteration_index(iteration_index, k); - let i = (iteration_index as usize) % BUCKETS; - let key2 = self.key[i] ^ key; - let check_hash2 = self.check_hash[i] ^ check_hash; - let count2 = self.count[i].wrapping_sub(count); - self.key[i] = key2; - self.check_hash[i] = check_hash2; - self.count[i] = count2; - if (count2 == 1 || count2 == -1) && check_hash2 == get_check_hash(key2) { + let mut iteration_index = u32::from_le(check_hash).wrapping_add(1); + for _ in 0..(HASHES as u64) { + iteration_index = murmurhash32_mix32(iteration_index); + let i2 = (iteration_index as usize) % BUCKETS; + let check_hash2 = self.check_hash[i2] ^ check_hash; + let count2 = self.count[i2].wrapping_sub(count); + let key2 = &mut self.key[i2]; + xor_with(key2, &key); + self.check_hash[i2] = check_hash2; + self.count[i2] = count2; + if (count2 == 1 || count2 == -1) && check_hash2 == crc32fast::hash(key2) { if queue.len() > BUCKETS { // sanity check, should be impossible break 'list_main; } - queue.push(i as u32); + queue.push(i2 as u32); } } + + f(key, count == 1); } } - self.count.iter().any(|x| *x != 0) || self.key.iter().any(|x| *x != 0) + self.count.iter().all(|x| *x == 0) || self.check_hash.iter().all(|x| *x == 0) } } #[cfg(test)] mod tests { - #[allow(unused)] - #[inline(always)] - pub fn xorshift64(mut x: u64) -> u64 { - x ^= x.wrapping_shl(13); - x ^= x.wrapping_shr(7); - x ^= x.wrapping_shl(17); - x - } - - #[allow(unused)] #[inline(always)] pub fn splitmix64(mut x: u64) -> u64 { x ^= x.wrapping_shr(30); @@ -237,35 +243,95 @@ mod tests { } use std::collections::HashSet; - #[allow(unused_imports)] - use std::time::SystemTime; use super::*; const HASHES: usize = 3; + fn check_xor_with2() { + let with = [17_u8; L]; + let mut expected = [69_u8; L]; + let mut actual = [69_u8; L]; + expected.iter_mut().zip(with.iter()).for_each(|(a, b)| *a ^= *b); + xor_with(&mut actual, &with); + assert!(actual.eq(&expected)); + } + + #[test] + fn check_xor_with() { + check_xor_with2::<128>(); + check_xor_with2::<65>(); + check_xor_with2::<64>(); + check_xor_with2::<63>(); + check_xor_with2::<33>(); + check_xor_with2::<32>(); + check_xor_with2::<31>(); + check_xor_with2::<17>(); + check_xor_with2::<16>(); + check_xor_with2::<15>(); + check_xor_with2::<9>(); + check_xor_with2::<8>(); + check_xor_with2::<7>(); + check_xor_with2::<6>(); + check_xor_with2::<5>(); + check_xor_with2::<4>(); + check_xor_with2::<3>(); + check_xor_with2::<2>(); + check_xor_with2::<1>(); + } + + #[test] + fn struct_packing() { + // Typical case + let mut tmp = IBLT::<64, 16, 3>::new(); + tmp.check_hash.fill(0x01010101); + tmp.count.fill(1); + tmp.key.iter_mut().for_each(|x| x.fill(1)); + assert!(tmp.as_bytes().iter().all(|x| *x == 1)); + + // Pathological alignment case #1 + let mut tmp = IBLT::<17, 13, 3>::new(); + tmp.check_hash.fill(0x01010101); + tmp.count.fill(1); + tmp.key.iter_mut().for_each(|x| x.fill(1)); + assert!(tmp.as_bytes().iter().all(|x| *x == 1)); + + // Pathological alignment case #2 + let mut tmp = IBLT::<17, 8, 3>::new(); + tmp.check_hash.fill(0x01010101); + tmp.count.fill(1); + tmp.key.iter_mut().for_each(|x| x.fill(1)); + assert!(tmp.as_bytes().iter().all(|x| *x == 1)); + + // Pathological alignment case #3 + let mut tmp = IBLT::<16, 7, 3>::new(); + tmp.check_hash.fill(0x01010101); + tmp.count.fill(1); + tmp.key.iter_mut().for_each(|x| x.fill(1)); + assert!(tmp.as_bytes().iter().all(|x| *x == 1)); + } + #[test] fn fill_list_performance() { const CAPACITY: usize = 4096; - //let mut rn = xorshift64(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64); - let mut rn = 31337; - let mut expected: HashSet = HashSet::with_capacity(4096); + let mut rn: u128 = 0xd3b07384d113edec49eaa6238ad5ff00; + let mut expected: HashSet = HashSet::with_capacity(4096); let mut count = 64; while count <= CAPACITY { - let mut test = IBLT::::new(); + let mut test = IBLT::::new(); expected.clear(); for _ in 0..count { - let x = rn; - rn = splitmix64(rn); - expected.insert(x); - test.insert(x); + rn = rn.wrapping_add(splitmix64(rn as u64) as u128); + expected.insert(rn); + test.insert(&rn.to_le_bytes()); } let mut list_count = 0; - test.list(|x| { + test.list(|x, d| { list_count += 1; - assert!(expected.contains(&x)); + assert!(expected.contains(&u128::from_le_bytes(x))); + assert!(d); }); println!("inserted: {}\tlisted: {}\tcapacity: {}\tscore: {:.4}\tfill: {:.4}", count, list_count, CAPACITY, (list_count as f64) / (count as f64), (count as f64) / (CAPACITY as f64)); @@ -278,36 +344,41 @@ mod tests { const CAPACITY: usize = 16384; const REMOTE_SIZE: usize = 1024 * 1024 * 2; const STEP: usize = 1024; - //let mut rn = xorshift64(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64); - let mut rn = 31337; + let mut rn: u128 = 0xd3b07384d113edec49eaa6238ad5ff00; let mut missing_count = 1024; - let mut missing: HashSet = HashSet::with_capacity(CAPACITY * 2); - let mut all: HashSet = HashSet::with_capacity(REMOTE_SIZE); + let mut missing: HashSet = HashSet::with_capacity(CAPACITY * 2); + let mut all: HashSet = HashSet::with_capacity(REMOTE_SIZE); while missing_count <= CAPACITY { missing.clear(); all.clear(); - let mut local = IBLT::::new(); - let mut remote = IBLT::::new(); + let mut local = IBLT::::new(); + let mut remote = IBLT::::new(); let mut k = 0; while k < REMOTE_SIZE { + rn = rn.wrapping_add(splitmix64(rn as u64) as u128); if all.insert(rn) { if k >= missing_count { - local.insert(rn); + local.insert(&rn.to_le_bytes()); } else { missing.insert(rn); } - remote.insert(rn); + remote.insert(&rn.to_le_bytes()); k += 1; } - rn = splitmix64(rn); } - local.subtract(&mut remote); + let mut remote2 = remote.clone(); + remote2.subtract(&local); + remote2.list(|_, d| { + assert!(d); + }); + local.subtract(&remote); let bytes = local.as_bytes().len(); let mut cnt = 0; - let all_success = local.list(|k| { - assert!(missing.contains(&k)); + let all_success = local.list(|x, d| { + assert!(missing.contains(&u128::from_le_bytes(x))); + assert!(!d); cnt += 1; }); diff --git a/syncwhole/src/protocol.rs b/syncwhole/src/protocol.rs index 2882f9a04..3d841ab40 100644 --- a/syncwhole/src/protocol.rs +++ b/syncwhole/src/protocol.rs @@ -30,7 +30,7 @@ pub enum MessageType { /// [...] HaveRecords = 3_u8, - /// [...] + /// [...] GetRecords = 4_u8, /// @@ -133,8 +133,8 @@ pub mod msg { } #[derive(Serialize, Deserialize)] - pub struct Sync<'a> { - /// 64-bit prefix of reocrd keys for this request + pub struct SyncRequest<'a> { + /// 64-bit prefix of record keys for this request #[serde(rename = "p")] pub prefix: u64, @@ -142,11 +142,24 @@ pub mod msg { #[serde(rename = "b")] pub prefix_bits: u8, - /// Reference time for query - #[serde(rename = "t")] - pub reference_time: u64, + /// Data-store-specific subset selector indicating what subset of items desired + pub subset: &'a [u8], + } - /// Set summary for keys under prefix + #[derive(Serialize, Deserialize)] + pub struct Sync<'a> { + /// 64-bit prefix of record keys for this request + #[serde(rename = "p")] + pub prefix: u64, + + /// Number of bits in prefix that are meaningful + #[serde(rename = "b")] + pub prefix_bits: u8, + + /// Data-store-specific subset selector indicating what subset of items were included + pub subset: &'a [u8], + + /// Set summary for keys under prefix within subset #[serde(with = "serde_bytes")] #[serde(rename = "i")] pub iblt: &'a [u8],