From 700855424cb264cc12fc2cf2c57bc58dcd1f24a8 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Sat, 29 Jan 2022 13:36:33 -0800 Subject: [PATCH] Move IBLT into core. It will probably get used there. --- .gitignore | 1 + zerotier-network-hypervisor/src/util/iblt.rs | 243 +++++++++++++++++++ zerotier-network-hypervisor/src/util/mod.rs | 1 + zerotier-network-hypervisor/src/vl1/peer.rs | 38 ++- 4 files changed, 272 insertions(+), 11 deletions(-) create mode 100644 zerotier-network-hypervisor/src/util/iblt.rs diff --git a/.gitignore b/.gitignore index 8e7a0f7fc..f4dddf5c9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ /aes-gmac-siv/Cargo.lock /zerotier-core-crypto/Cargo.lock /zerotier-network-hypervisor/Cargo.lock +/zeropoint/Cargo.lock .DS_* .Icon* diff --git a/zerotier-network-hypervisor/src/util/iblt.rs b/zerotier-network-hypervisor/src/util/iblt.rs new file mode 100644 index 000000000..011052558 --- /dev/null +++ b/zerotier-network-hypervisor/src/util/iblt.rs @@ -0,0 +1,243 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + +use std::io::{Read, Write}; +use std::mem::{transmute, zeroed}; + +use zerotier_core_crypto::varint; + +// max value: 6, 5 was determined to be good via empirical testing +const KEY_MAPPING_ITERATIONS: usize = 5; + +#[inline(always)] +fn xorshift64(mut x: u64) -> u64 { + x ^= x.wrapping_shl(13); + x ^= x.wrapping_shr(7); + x ^= x.wrapping_shl(17); + x +} + +#[inline(always)] +fn calc_check_hash(key: &[u8]) -> u64 { + xorshift64(u64::from_le_bytes((&key[(HS - 8)..HS]).try_into().unwrap())) +} + +#[derive(Clone, PartialEq, Eq)] +struct IBLTEntry { + key_sum: [u8; HS], + check_hash_sum: u64, + count: i64 +} + +impl IBLTEntry { + #[inline(always)] + fn is_singular(&self) -> bool { + if self.count == 1 || self.count == -1 { + calc_check_hash::(&self.key_sum) == self.check_hash_sum + } else { + false + } + } +} + +/// An Invertible Bloom Lookup Table for set reconciliation. +/// +/// This implementation assumes that hashes are random. Hashes must be +/// at least 8 bytes in size. +#[derive(Clone, PartialEq, Eq)] +pub struct IBLT { + map: [IBLTEntry; B] +} + +impl IBLT { + pub const BUCKETS: usize = B; + + pub const HASH_SIZE: usize = HS; + + pub fn new() -> Self { unsafe { zeroed() } } + + pub fn read(&mut self, r: &mut R) -> std::io::Result<()> { + let mut tmp = [0_u8; 8]; + let mut prev_c = 0_i64; + for i in 0..B { + let b = &mut self.map[i]; + r.read_exact(&mut b.key_sum)?; + r.read_exact(&mut tmp)?; + b.check_hash_sum = u64::from_le_bytes(tmp); + let c = varint::read(r)?.0 + prev_c; + if (c & 1) == 0 { + b.count = c.wrapping_shr(1) as i64; + } else { + b.count = -(c.wrapping_shr(1) as i64); + } + prev_c = b.count; + } + Ok(()) + } + + pub fn write(&self, w: &mut W) -> std::io::Result<()> { + let mut prev_c = 0_i64; + for i in 0..B { + let b = &self.map[i]; + w.write_all(&b.key_sum)?; + w.write_all(&b.check_hash_sum.to_le_bytes())?; + let c = b.count - prev_c; + prev_c = b.count; + if c >= 0 { + varint::write(w, (c as u64).wrapping_shl(1))?; + } else { + varint::write(w, (-c as u64).wrapping_shl(1) | 1)?; + } + } + Ok(()) + } + + fn ins_rem(&mut self, key: &[u8], delta: i64) { + assert!(HS >= 8); + assert!(key.len() >= HS); + let iteration_indices: [u64; 8] = unsafe { transmute(zerotier_core_crypto::hash::SHA512::hash(key)) }; + let check_hash = Self::calc_check_hash::(&key); + for i in 0..KEY_MAPPING_ITERATIONS { + let b = &mut self.map[(u64::from_le(iteration_indices[i]) as usize) % B]; + for x in 0..HS { + b.key_sum[x] ^= key[x]; + } + b.check_hash_sum ^= check_hash; + b.count += delta; + } + } + + #[inline(always)] + pub fn insert(&mut self, key: &[u8]) { self.ins_rem(key, 1); } + + #[inline(always)] + pub fn remove(&mut self, key: &[u8]) { self.ins_rem(key, -1); } + + pub fn subtract(&mut self, other: &Self) { + for b in 0..B { + let s = &mut self.map[b]; + let o = &other.map[b]; + for x in 0..HS { + s.key_sum[x] ^= o.key_sum[x]; + } + s.check_hash_sum ^= o.check_hash_sum; + s.count += o.count; + } + } + + pub fn list(mut self, mut f: F) -> bool { + assert!(HS >= 8); + let mut singular_buckets = [0_usize; B]; + let mut singular_bucket_count = 0_usize; + + for b in 0..B { + if self.map[b].is_singular() { + singular_buckets[singular_bucket_count] = b; + singular_bucket_count += 1; + } + } + + while singular_bucket_count > 0 { + singular_bucket_count -= 1; + let b = &self.map[singular_buckets[singular_bucket_count]]; + + if b.is_singular() { + let key = b.key_sum.clone(); + let iteration_indices: [u64; 8] = unsafe { transmute(zerotier_core_crypto::hash::SHA512::hash(&key)) }; + let check_hash = Self::calc_check_hash::(&key); + + f(&key); + + for i in 0..KEY_MAPPING_ITERATIONS { + let b_idx = (u64::from_le(iteration_indices[i]) as usize) % B; + let b = &mut self.map[b_idx]; + for x in 0..HS { + b.key_sum[x] ^= key[x]; + } + b.check_hash_sum ^= check_hash; + b.count -= 1; + + if b.is_singular() { + if singular_bucket_count >= B { + // This would indicate an invalid IBLT. + return false; + } + singular_buckets[singular_bucket_count] = b_idx; + singular_bucket_count += 1; + } + } + } + } + + return true; + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use zerotier_core_crypto::hash::SHA384; + + use crate::iblt::*; + + #[allow(unused_variables)] + #[test] + fn insert_and_list() { + let mut e: HashSet<[u8; 48]> = HashSet::with_capacity(1024); + for _ in 0..2 { + for expected_cnt in 0..768 { + let random_u64 = zerotier_core_crypto::random::xorshift64_random(); + let mut t: IBLT<48, 1152> = IBLT::new(); + e.clear(); + for i in 0..expected_cnt { + let k = SHA384::hash(&((i + random_u64) as u64).to_le_bytes()); + t.insert(&k); + e.insert(k); + } + let mut cnt = 0; + t.list(|k| { + assert!(e.contains(k)); + cnt += 1; + }); + assert_eq!(cnt, expected_cnt); + } + } + } + + #[allow(unused_variables)] + #[test] + fn set_reconciliation() { + for _ in 0..10 { + let random_u64 = zerotier_core_crypto::random::xorshift64_random(); + let mut alice: IBLT<48, 2048> = IBLT::new(); + let mut bob: IBLT<48, 2048> = IBLT::new(); + let mut alice_total = 0_i32; + let mut bob_total = 0_i32; + for i in 0..1500 { + let k = SHA384::hash(&((i ^ random_u64) as u64).to_le_bytes()); + if (k[0] & 1) == 1 { + alice.insert(&k); + alice_total += 1; + } + if (k[0] & 3) == 2 { + bob.insert(&k); + bob_total += 1; + } + } + alice.subtract(&bob); + let mut diff_total = 0_i32; + alice.list(|k| { + diff_total += 1; + }); + // This is a probabilistic process so we tolerate a little bit of failure. The idea is that each + // pass reconciles more and more differences. + assert!(((alice_total + bob_total) - diff_total).abs() <= 128); + } + } +} diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index 01a3c6a81..c0a7d4a7a 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -9,6 +9,7 @@ pub mod pool; pub mod gate; pub mod buffer; +pub mod iblt; pub use zerotier_core_crypto::hex; pub use zerotier_core_crypto::varint; diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index 3768666f5..9e98c35ed 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -100,7 +100,7 @@ fn salsa_poly_create(secret: &SymmetricSecret, header: &PacketHeader, packet_siz (salsa, Poly1305::new(&poly1305_key).unwrap()) } -/// Attempt AEAD packet encryption and MAC validation. +/// Attempt AEAD packet encryption and MAC validation. Returns message ID on success. fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option], payload: &mut Buffer) -> Option { packet_frag0_payload_bytes.get(0).map_or(None, |verb| { match header.cipher() { @@ -177,6 +177,7 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], impl Peer { /// Create a new peer. + /// /// This only returns None if this_node_identity does not have its secrets or if some /// fatal error occurs performing key agreement between the two identities. pub(crate) fn new(this_node_identity: &Identity, id: Identity) -> Option { @@ -202,29 +203,45 @@ impl Peer { }) } - /// Get the next message ID. + /// Get the next message ID for sending a message to this peer. #[inline(always)] pub(crate) fn next_message_id(&self) -> u64 { self.message_id_counter.fetch_add(1, Ordering::Relaxed) } /// Receive, decrypt, authenticate, and process an incoming packet from this peer. + /// /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. - pub(crate) fn receive(&self, node: &Node, si: &SI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { - let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { + pub(crate) fn receive( + &self, + node: &Node, + si: &SI, + vi: &VI, + time_ticks: i64, + source_endpoint: &Endpoint, + source_path: &Arc, + header: &PacketHeader, + frag0: &Buffer<{ PACKET_SIZE_MAX }>, + fragments: &[Option]) + { + let _ = frag0.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let mut payload: Buffer = unsafe { Buffer::new_without_memzero() }; let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_secret.load_full() { if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload) { + // Decryption successful with ephemeral secret ephemeral_secret.decrypt_uses.fetch_add(1, Ordering::Relaxed); (true, message_id) } else { + // Decryption failed with ephemeral secret, which may indicate that it's obsolete. (false, 0) } } else { + // There is no ephemeral secret negotiated (yet?). (false, 0) }; if !forward_secrecy { if let Some(message_id2) = try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload) { + // Decryption successful with static secret. message_id = message_id2; } else { // Packet failed to decrypt using either ephemeral or permament key, reject. @@ -275,12 +292,12 @@ impl Peer { // because the most performance critical path is the handling of the ???_FRAME // verbs, which are in VL2. verb &= VERB_MASK; // mask off flags - if !ph.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) { + if !vi.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) { match verb { //VERB_VL1_NOP => {} VERB_VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload), - VERB_VL1_ERROR => self.receive_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), - VERB_VL1_OK => self.receive_ok(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), + VERB_VL1_ERROR => self.receive_error(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), + VERB_VL1_OK => self.receive_ok(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), VERB_VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload), VERB_VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload), VERB_VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload), @@ -289,6 +306,7 @@ impl Peer { _ => {} } } else { + // In debug build check to make sure the next layer (VL2) is complying with the API contract. #[cfg(debug)] { if match verb { VERB_VL1_NOP | VERB_VL1_HELLO | VERB_VL1_ERROR | VERB_VL1_OK | VERB_VL1_WHOIS | VERB_VL1_RENDEZVOUS | VERB_VL1_ECHO | VERB_VL1_PUSH_DIRECT_PATHS | VERB_VL1_USER_MESSAGE => true, @@ -491,10 +509,8 @@ impl Peer { let current_packet_id_counter = self.message_id_counter.load(Ordering::Relaxed); if current_packet_id_counter.wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { match ok_header.in_re_verb { - VERB_VL1_HELLO => { - } - VERB_VL1_WHOIS => { - } + VERB_VL1_HELLO => {} + VERB_VL1_WHOIS => {} _ => { ph.handle_ok(self, source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor); }