Move IBLT into core. It will probably get used there.

This commit is contained in:
Adam Ierymenko 2022-01-29 13:36:33 -08:00
parent b15ecfd163
commit 700855424c
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
4 changed files with 272 additions and 11 deletions

1
.gitignore vendored
View file

@ -4,6 +4,7 @@
/aes-gmac-siv/Cargo.lock /aes-gmac-siv/Cargo.lock
/zerotier-core-crypto/Cargo.lock /zerotier-core-crypto/Cargo.lock
/zerotier-network-hypervisor/Cargo.lock /zerotier-network-hypervisor/Cargo.lock
/zeropoint/Cargo.lock
.DS_* .DS_*
.Icon* .Icon*

View file

@ -0,0 +1,243 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::io::{Read, Write};
use std::mem::{transmute, zeroed};
use zerotier_core_crypto::varint;
// max value: 6, 5 was determined to be good via empirical testing
const KEY_MAPPING_ITERATIONS: usize = 5;
#[inline(always)]
fn xorshift64(mut x: u64) -> u64 {
x ^= x.wrapping_shl(13);
x ^= x.wrapping_shr(7);
x ^= x.wrapping_shl(17);
x
}
#[inline(always)]
fn calc_check_hash<const HS: usize>(key: &[u8]) -> u64 {
xorshift64(u64::from_le_bytes((&key[(HS - 8)..HS]).try_into().unwrap()))
}
#[derive(Clone, PartialEq, Eq)]
struct IBLTEntry<const HS: usize> {
key_sum: [u8; HS],
check_hash_sum: u64,
count: i64
}
impl<const HS: usize> IBLTEntry<HS> {
#[inline(always)]
fn is_singular(&self) -> bool {
if self.count == 1 || self.count == -1 {
calc_check_hash::<HS>(&self.key_sum) == self.check_hash_sum
} else {
false
}
}
}
/// An Invertible Bloom Lookup Table for set reconciliation.
///
/// This implementation assumes that hashes are random. Hashes must be
/// at least 8 bytes in size.
#[derive(Clone, PartialEq, Eq)]
pub struct IBLT<const HS: usize, const B: usize> {
map: [IBLTEntry<HS>; B]
}
impl<const HS: usize, const B: usize> IBLT<HS, B> {
pub const BUCKETS: usize = B;
pub const HASH_SIZE: usize = HS;
pub fn new() -> Self { unsafe { zeroed() } }
pub fn read<R: Read>(&mut self, r: &mut R) -> std::io::Result<()> {
let mut tmp = [0_u8; 8];
let mut prev_c = 0_i64;
for i in 0..B {
let b = &mut self.map[i];
r.read_exact(&mut b.key_sum)?;
r.read_exact(&mut tmp)?;
b.check_hash_sum = u64::from_le_bytes(tmp);
let c = varint::read(r)?.0 + prev_c;
if (c & 1) == 0 {
b.count = c.wrapping_shr(1) as i64;
} else {
b.count = -(c.wrapping_shr(1) as i64);
}
prev_c = b.count;
}
Ok(())
}
pub fn write<W: Write>(&self, w: &mut W) -> std::io::Result<()> {
let mut prev_c = 0_i64;
for i in 0..B {
let b = &self.map[i];
w.write_all(&b.key_sum)?;
w.write_all(&b.check_hash_sum.to_le_bytes())?;
let c = b.count - prev_c;
prev_c = b.count;
if c >= 0 {
varint::write(w, (c as u64).wrapping_shl(1))?;
} else {
varint::write(w, (-c as u64).wrapping_shl(1) | 1)?;
}
}
Ok(())
}
fn ins_rem(&mut self, key: &[u8], delta: i64) {
assert!(HS >= 8);
assert!(key.len() >= HS);
let iteration_indices: [u64; 8] = unsafe { transmute(zerotier_core_crypto::hash::SHA512::hash(key)) };
let check_hash = Self::calc_check_hash::<HS>(&key);
for i in 0..KEY_MAPPING_ITERATIONS {
let b = &mut self.map[(u64::from_le(iteration_indices[i]) as usize) % B];
for x in 0..HS {
b.key_sum[x] ^= key[x];
}
b.check_hash_sum ^= check_hash;
b.count += delta;
}
}
#[inline(always)]
pub fn insert(&mut self, key: &[u8]) { self.ins_rem(key, 1); }
#[inline(always)]
pub fn remove(&mut self, key: &[u8]) { self.ins_rem(key, -1); }
pub fn subtract(&mut self, other: &Self) {
for b in 0..B {
let s = &mut self.map[b];
let o = &other.map[b];
for x in 0..HS {
s.key_sum[x] ^= o.key_sum[x];
}
s.check_hash_sum ^= o.check_hash_sum;
s.count += o.count;
}
}
pub fn list<F: FnMut(&[u8; HS])>(mut self, mut f: F) -> bool {
assert!(HS >= 8);
let mut singular_buckets = [0_usize; B];
let mut singular_bucket_count = 0_usize;
for b in 0..B {
if self.map[b].is_singular() {
singular_buckets[singular_bucket_count] = b;
singular_bucket_count += 1;
}
}
while singular_bucket_count > 0 {
singular_bucket_count -= 1;
let b = &self.map[singular_buckets[singular_bucket_count]];
if b.is_singular() {
let key = b.key_sum.clone();
let iteration_indices: [u64; 8] = unsafe { transmute(zerotier_core_crypto::hash::SHA512::hash(&key)) };
let check_hash = Self::calc_check_hash::<HS>(&key);
f(&key);
for i in 0..KEY_MAPPING_ITERATIONS {
let b_idx = (u64::from_le(iteration_indices[i]) as usize) % B;
let b = &mut self.map[b_idx];
for x in 0..HS {
b.key_sum[x] ^= key[x];
}
b.check_hash_sum ^= check_hash;
b.count -= 1;
if b.is_singular() {
if singular_bucket_count >= B {
// This would indicate an invalid IBLT.
return false;
}
singular_buckets[singular_bucket_count] = b_idx;
singular_bucket_count += 1;
}
}
}
}
return true;
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use zerotier_core_crypto::hash::SHA384;
use crate::iblt::*;
#[allow(unused_variables)]
#[test]
fn insert_and_list() {
let mut e: HashSet<[u8; 48]> = HashSet::with_capacity(1024);
for _ in 0..2 {
for expected_cnt in 0..768 {
let random_u64 = zerotier_core_crypto::random::xorshift64_random();
let mut t: IBLT<48, 1152> = IBLT::new();
e.clear();
for i in 0..expected_cnt {
let k = SHA384::hash(&((i + random_u64) as u64).to_le_bytes());
t.insert(&k);
e.insert(k);
}
let mut cnt = 0;
t.list(|k| {
assert!(e.contains(k));
cnt += 1;
});
assert_eq!(cnt, expected_cnt);
}
}
}
#[allow(unused_variables)]
#[test]
fn set_reconciliation() {
for _ in 0..10 {
let random_u64 = zerotier_core_crypto::random::xorshift64_random();
let mut alice: IBLT<48, 2048> = IBLT::new();
let mut bob: IBLT<48, 2048> = IBLT::new();
let mut alice_total = 0_i32;
let mut bob_total = 0_i32;
for i in 0..1500 {
let k = SHA384::hash(&((i ^ random_u64) as u64).to_le_bytes());
if (k[0] & 1) == 1 {
alice.insert(&k);
alice_total += 1;
}
if (k[0] & 3) == 2 {
bob.insert(&k);
bob_total += 1;
}
}
alice.subtract(&bob);
let mut diff_total = 0_i32;
alice.list(|k| {
diff_total += 1;
});
// This is a probabilistic process so we tolerate a little bit of failure. The idea is that each
// pass reconciles more and more differences.
assert!(((alice_total + bob_total) - diff_total).abs() <= 128);
}
}
}

View file

@ -9,6 +9,7 @@
pub mod pool; pub mod pool;
pub mod gate; pub mod gate;
pub mod buffer; pub mod buffer;
pub mod iblt;
pub use zerotier_core_crypto::hex; pub use zerotier_core_crypto::hex;
pub use zerotier_core_crypto::varint; pub use zerotier_core_crypto::varint;

View file

@ -100,7 +100,7 @@ fn salsa_poly_create(secret: &SymmetricSecret, header: &PacketHeader, packet_siz
(salsa, Poly1305::new(&poly1305_key).unwrap()) (salsa, Poly1305::new(&poly1305_key).unwrap())
} }
/// Attempt AEAD packet encryption and MAC validation. /// Attempt AEAD packet encryption and MAC validation. Returns message ID on success.
fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option<PacketBuffer>], payload: &mut Buffer<PACKET_SIZE_MAX>) -> Option<u64> { fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option<PacketBuffer>], payload: &mut Buffer<PACKET_SIZE_MAX>) -> Option<u64> {
packet_frag0_payload_bytes.get(0).map_or(None, |verb| { packet_frag0_payload_bytes.get(0).map_or(None, |verb| {
match header.cipher() { match header.cipher() {
@ -177,6 +177,7 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8],
impl Peer { impl Peer {
/// Create a new peer. /// Create a new peer.
///
/// This only returns None if this_node_identity does not have its secrets or if some /// This only returns None if this_node_identity does not have its secrets or if some
/// fatal error occurs performing key agreement between the two identities. /// fatal error occurs performing key agreement between the two identities.
pub(crate) fn new(this_node_identity: &Identity, id: Identity) -> Option<Peer> { pub(crate) fn new(this_node_identity: &Identity, id: Identity) -> Option<Peer> {
@ -202,29 +203,45 @@ impl Peer {
}) })
} }
/// Get the next message ID. /// Get the next message ID for sending a message to this peer.
#[inline(always)] #[inline(always)]
pub(crate) fn next_message_id(&self) -> u64 { self.message_id_counter.fetch_add(1, Ordering::Relaxed) } pub(crate) fn next_message_id(&self) -> u64 { self.message_id_counter.fetch_add(1, Ordering::Relaxed) }
/// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// Receive, decrypt, authenticate, and process an incoming packet from this peer.
///
/// If the packet comes in multiple fragments, the fragments slice should contain all /// If the packet comes in multiple fragments, the fragments slice should contain all
/// those fragments after the main packet header and first chunk. /// those fragments after the main packet header and first chunk.
pub(crate) fn receive<SI: SystemInterface, PH: VL1VirtualInterface>(&self, node: &Node, si: &SI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) { pub(crate) fn receive<SI: SystemInterface, VI: VL1VirtualInterface>(
let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { &self,
node: &Node,
si: &SI,
vi: &VI,
time_ticks: i64,
source_endpoint: &Endpoint,
source_path: &Arc<Path>,
header: &PacketHeader,
frag0: &Buffer<{ PACKET_SIZE_MAX }>,
fragments: &[Option<PacketBuffer>])
{
let _ = frag0.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| {
let mut payload: Buffer<PACKET_SIZE_MAX> = unsafe { Buffer::new_without_memzero() }; let mut payload: Buffer<PACKET_SIZE_MAX> = unsafe { Buffer::new_without_memzero() };
let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_secret.load_full() { let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_secret.load_full() {
if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload) { if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload) {
// Decryption successful with ephemeral secret
ephemeral_secret.decrypt_uses.fetch_add(1, Ordering::Relaxed); ephemeral_secret.decrypt_uses.fetch_add(1, Ordering::Relaxed);
(true, message_id) (true, message_id)
} else { } else {
// Decryption failed with ephemeral secret, which may indicate that it's obsolete.
(false, 0) (false, 0)
} }
} else { } else {
// There is no ephemeral secret negotiated (yet?).
(false, 0) (false, 0)
}; };
if !forward_secrecy { if !forward_secrecy {
if let Some(message_id2) = try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload) { if let Some(message_id2) = try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload) {
// Decryption successful with static secret.
message_id = message_id2; message_id = message_id2;
} else { } else {
// Packet failed to decrypt using either ephemeral or permament key, reject. // Packet failed to decrypt using either ephemeral or permament key, reject.
@ -275,12 +292,12 @@ impl Peer {
// because the most performance critical path is the handling of the ???_FRAME // because the most performance critical path is the handling of the ???_FRAME
// verbs, which are in VL2. // verbs, which are in VL2.
verb &= VERB_MASK; // mask off flags verb &= VERB_MASK; // mask off flags
if !ph.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) { if !vi.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) {
match verb { match verb {
//VERB_VL1_NOP => {} //VERB_VL1_NOP => {}
VERB_VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload), VERB_VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload),
VERB_VL1_ERROR => self.receive_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), VERB_VL1_ERROR => self.receive_error(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload),
VERB_VL1_OK => self.receive_ok(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), VERB_VL1_OK => self.receive_ok(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload),
VERB_VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload), VERB_VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload),
VERB_VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload), VERB_VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload),
VERB_VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload), VERB_VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload),
@ -289,6 +306,7 @@ impl Peer {
_ => {} _ => {}
} }
} else { } else {
// In debug build check to make sure the next layer (VL2) is complying with the API contract.
#[cfg(debug)] { #[cfg(debug)] {
if match verb { if match verb {
VERB_VL1_NOP | VERB_VL1_HELLO | VERB_VL1_ERROR | VERB_VL1_OK | VERB_VL1_WHOIS | VERB_VL1_RENDEZVOUS | VERB_VL1_ECHO | VERB_VL1_PUSH_DIRECT_PATHS | VERB_VL1_USER_MESSAGE => true, VERB_VL1_NOP | VERB_VL1_HELLO | VERB_VL1_ERROR | VERB_VL1_OK | VERB_VL1_WHOIS | VERB_VL1_RENDEZVOUS | VERB_VL1_ECHO | VERB_VL1_PUSH_DIRECT_PATHS | VERB_VL1_USER_MESSAGE => true,
@ -491,10 +509,8 @@ impl Peer {
let current_packet_id_counter = self.message_id_counter.load(Ordering::Relaxed); let current_packet_id_counter = self.message_id_counter.load(Ordering::Relaxed);
if current_packet_id_counter.wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { if current_packet_id_counter.wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX {
match ok_header.in_re_verb { match ok_header.in_re_verb {
VERB_VL1_HELLO => { VERB_VL1_HELLO => {}
} VERB_VL1_WHOIS => {}
VERB_VL1_WHOIS => {
}
_ => { _ => {
ph.handle_ok(self, source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor); ph.handle_ok(self, source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor);
} }