diff --git a/controller/src/controller.rs b/controller/src/controller.rs index c6f032df5..bbf7ddae1 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -504,14 +504,6 @@ impl InnerProtocolLayer for Controller { true } - fn has_trust_relationship(&self, id: &Valid) -> bool { - self.recently_authorized - .read() - .unwrap() - .get(&id.fingerprint) - .map_or(false, |by_network| by_network.values().any(|t| *t > ms_monotonic())) - } - fn handle_packet( &self, host_system: &HostSystemImpl, diff --git a/network-hypervisor/Cargo.toml b/network-hypervisor/Cargo.toml index fa6951cfe..7e768e488 100644 --- a/network-hypervisor/Cargo.toml +++ b/network-hypervisor/Cargo.toml @@ -16,6 +16,8 @@ lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked- serde = { version = "^1", features = ["derive"], default-features = false } phf = { version = "^0", features = ["macros", "std"], default-features = false } num-traits = "^0" +rmp-serde = "^1" +fastcdc = "^3" [dev-dependencies] rand = "*" diff --git a/network-hypervisor/src/dr/mod.rs b/network-hypervisor/src/dr/mod.rs new file mode 100644 index 000000000..3ee667d4e --- /dev/null +++ b/network-hypervisor/src/dr/mod.rs @@ -0,0 +1 @@ +pub mod rcdcb; diff --git a/network-hypervisor/src/dr/rcdcb.rs b/network-hypervisor/src/dr/rcdcb.rs new file mode 100644 index 000000000..bdcd26096 --- /dev/null +++ b/network-hypervisor/src/dr/rcdcb.rs @@ -0,0 +1,183 @@ +use fastcdc::v2020::{FastCDC, MINIMUM_MIN}; +use std::io::Write; +use zerotier_crypto::hash::{SHA384, SHA384_HASH_SIZE}; +use zerotier_utils::error::{InvalidFormatError, InvalidParameterError}; + +const MAX_RECURSION_DEPTH: usize = 64; + +/// Recursively scatter/gather chunked object assembler. +pub struct ObjectAssembler { + data_chunks: Vec>, + need: Vec, +} + +impl ObjectAssembler { + /// Create a new assembler to gather an object given its root hash list. + pub fn init(hash_list: Vec) -> Self { + Self { data_chunks: Vec::new(), need: hash_list } + } + + fn gather_recursive Option>>( + hl: &[u8], + new_hl: &mut Vec, + get_chunk: &mut GetChunk, + have_all_data_chunk_hashes: &mut bool, + depth: usize, + ) -> Result<(), InvalidFormatError> { + if (hl.len() % SHA384_HASH_SIZE) != 0 { + return Err(InvalidFormatError); + } + for h in hl.chunks_exact(SHA384_HASH_SIZE) { + if (h[SHA384_HASH_SIZE - 1] & 0x01) != 0 { + if let Some(chunk) = get_chunk(h) { + if depth < MAX_RECURSION_DEPTH { + Self::gather_recursive(chunk.as_slice(), new_hl, get_chunk, have_all_data_chunk_hashes, depth + 1)?; + continue; + } else { + return Err(InvalidFormatError); + } + } + *have_all_data_chunk_hashes = false; + } + let _ = new_hl.write_all(h); + } + return Ok(()); + } + + /// Try to assemble this object, using the supplied function to request chunks we don't have. + /// + /// Once all chunks are retrieved this will return Ok(Some(object)). An error return can occur if a chunk + /// is invalid or the maximum recursion depth is reached. + pub fn gather Option>>(&mut self, mut get_chunk: GetChunk) -> Result>, InvalidFormatError> { + let mut new_need = Vec::with_capacity(self.need.len()); + let mut have_all_data_chunk_hashes = true; + Self::gather_recursive(self.need.as_slice(), &mut new_need, &mut get_chunk, &mut have_all_data_chunk_hashes, 0)?; + std::mem::swap(&mut self.need, &mut new_need); + + if have_all_data_chunk_hashes { + self.data_chunks.resize(self.need.len() / SHA384_HASH_SIZE, Vec::new()); + + new_need.clear(); + let mut cn = 0; + for h in self.need.chunks_exact(SHA384_HASH_SIZE) { + if let Some(chunk) = get_chunk(h) { + *self.data_chunks.get_mut(cn).unwrap() = chunk; + } else { + let _ = new_need.write_all(h); + } + cn += 1; + } + self.need = new_need; + + if self.need.is_empty() { + let mut obj_size = 0; + for dc in self.data_chunks.iter() { + obj_size += dc.len(); + } + let mut obj = Vec::with_capacity(obj_size); + for dc in self.data_chunks.iter() { + let _ = obj.write_all(dc.as_slice()); + } + return Ok(Some(obj)); + } + } + + return Ok(None); + } +} + +/// Decompose an object into a series of chunks identified by SHA384 hashes. +/// +/// This splits the supplied binary object into chunks using the FastCDC2020 content defined chunking +/// algorithm. For each chunk a SHA384 hash is computed and added to a hash list. If the resulting +/// hash list is larger than max_chunk_size it is further chunked in a simple deterministic way to +/// yield hashes that point to further lists of hashes. The least significant bit in each hash is +/// set to 0 if the hash points to a chunk of data or 1 if it points to a chunk of hashes. +/// +/// The supplied function is called to output each chunk except for the root hash list, which is +/// returned. +/// +/// * `obj` - Blob to decompose +/// * `max_chunk_size` - Maximum size of any chunk including root hash list (minimum allowed: 256) +/// * `store_chunk` - Function that is called to store each chunk other than the root hash list +pub fn scatter( + obj: &[u8], + max_chunk_size: u32, + mut store_chunk: F, +) -> Result, InvalidParameterError> { + if max_chunk_size < 512 { + return Err(InvalidParameterError("max chunk size must be >= 512")); + } + let mut root_hash_list = Vec::with_capacity(max_chunk_size as usize); + + for chunk in FastCDC::new(obj, (max_chunk_size / 4).max(MINIMUM_MIN), max_chunk_size / 2, max_chunk_size) { + let chunk = &obj[chunk.offset..chunk.offset + chunk.length]; + let mut chunk_hash = SHA384::hash(chunk); + chunk_hash[SHA384_HASH_SIZE - 1] &= 0xfe; // chunk of data + let _ = root_hash_list.write_all(&chunk_hash); + store_chunk(chunk_hash, chunk); + } + + if root_hash_list.len() > (max_chunk_size as usize) { + let max_hashes_per_chunk = ((max_chunk_size / (SHA384_HASH_SIZE as u32)) * (SHA384_HASH_SIZE as u32)) as usize; + let mut new_root_hash_list = Vec::with_capacity(max_chunk_size as usize); + let mut recursion_depth = 0; + loop { + let mut r = root_hash_list.as_slice(); + while !r.is_empty() { + let clen = r.len().min(max_hashes_per_chunk); + let chunk = &r[..clen]; + if clen > SHA384_HASH_SIZE && (new_root_hash_list.len() + clen) > (max_chunk_size as usize) { + let mut chunk_hash = SHA384::hash(chunk); + chunk_hash[SHA384_HASH_SIZE - 1] |= 0x01; // chunk of hashes + let _ = new_root_hash_list.write_all(&chunk_hash); + store_chunk(chunk_hash, chunk); + r = &r[clen..]; + } else { + let _ = new_root_hash_list.write_all(chunk); + break; + } + } + std::mem::swap(&mut root_hash_list, &mut new_root_hash_list); + + if root_hash_list.len() <= (max_chunk_size as usize) { + break; + } else { + new_root_hash_list.clear(); + if recursion_depth >= MAX_RECURSION_DEPTH { + return Err(InvalidParameterError("max recursion depth exceeded")); + } + recursion_depth += 1; + } + } + } + + return Ok(root_hash_list); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use zerotier_utils::hex; + + #[test] + fn rcdcb_random_blobs() { + let mut random_data = Vec::new(); + random_data.resize(1024 * 1024 * 32, 0); + zerotier_crypto::random::fill_bytes_secure(random_data.as_mut()); + + let mut chunks = HashMap::new(); + for _ in 0..1024 { + chunks.clear(); + let test_blob = ((zerotier_crypto::random::xorshift64_random() as usize) % (random_data.len() - 1)) + 1; + let test_blob = &random_data.as_slice()[..test_blob]; + + let root_hash_list = scatter(test_blob, 1024, |k, v| { + //println!("{}", hex::to_string(&k)); + chunks.insert(k, v.to_vec()); + }); + println!("{} chunks", chunks.len()); + } + } +} diff --git a/network-hypervisor/src/dr/replicationservice.rs b/network-hypervisor/src/dr/replicationservice.rs new file mode 100644 index 000000000..5ea3dad73 --- /dev/null +++ b/network-hypervisor/src/dr/replicationservice.rs @@ -0,0 +1,17 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; + +use crate::protocol; +use crate::protocol::PacketBuffer; +use crate::vl1::*; + +use zerotier_utils::buffer::OutOfBoundsError; +use zerotier_utils::sync::RMaybeWLockGuard; + +pub struct ReplicationService {} + +impl ReplicationService { + pub fn new() -> Self { + Self {} + } +} diff --git a/network-hypervisor/src/lib.rs b/network-hypervisor/src/lib.rs index 8d6ab79b3..f03b00736 100644 --- a/network-hypervisor/src/lib.rs +++ b/network-hypervisor/src/lib.rs @@ -4,6 +4,7 @@ pub const VERSION_MAJOR: u8 = 1; pub const VERSION_MINOR: u8 = 99; pub const VERSION_REVISION: u16 = 1; +pub mod dr; #[allow(unused)] pub mod protocol; pub mod vl1; diff --git a/network-hypervisor/src/protocol.rs b/network-hypervisor/src/protocol.rs index 92bc84ce6..ef68d34e5 100644 --- a/network-hypervisor/src/protocol.rs +++ b/network-hypervisor/src/protocol.rs @@ -75,6 +75,8 @@ pub type MessageId = u64; /// ZeroTier VL1 and VL2 wire protocol message types. pub mod message_type { + // VL1: Virtual Layer 1, the peer to peer network + pub const VL1_NOP: u8 = 0x00; pub const VL1_HELLO: u8 = 0x01; pub const VL1_ERROR: u8 = 0x02; @@ -85,12 +87,19 @@ pub mod message_type { pub const VL1_PUSH_DIRECT_PATHS: u8 = 0x10; pub const VL1_USER_MESSAGE: u8 = 0x14; + // VL2: Virtual Layer 2, the virtual Ethernet network + pub const VL2_MULTICAST_LIKE: u8 = 0x09; pub const VL2_NETWORK_CREDENTIALS: u8 = 0x0a; pub const VL2_NETWORK_CONFIG_REQUEST: u8 = 0x0b; pub const VL2_NETWORK_CONFIG: u8 = 0x0c; pub const VL2_MULTICAST_GATHER: u8 = 0x0d; + // DR: Data replication protocol (scatter-gather based) + + pub const DR_REQUEST: u8 = 0x1e; + pub const DR_DATA: u8 = 0x1f; + pub fn name(verb: u8) -> &'static str { match verb { VL1_NOP => "VL1_NOP", @@ -106,6 +115,8 @@ pub mod message_type { VL2_NETWORK_CONFIG_REQUEST => "VL2_NETWORK_CONFIG_REQUEST", VL2_NETWORK_CONFIG => "VL2_NETWORK_CONFIG", VL2_MULTICAST_GATHER => "VL2_MULTICAST_GATHER", + DR_REQUEST => "DR_REQUEST", + DR_DATA => "DR_DATA", _ => "???", } } diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 4db0ec531..0118af34a 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -137,17 +137,6 @@ pub trait InnerProtocolLayer: Sync + Send { true } - /// Check if this node has any trust relationship with the provided identity. - /// - /// This should return true if there is any special trust relationship. It controls things - /// like sharing of detailed P2P connectivity data, which should be limited to peers with - /// some privileged relationship like mutual membership in a network. - /// - /// The default implementation always returns true. - fn has_trust_relationship(&self, id: &Valid) -> bool { - true - } - /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index 8b66ee4c2..3e2a3f32f 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -11,14 +11,6 @@ pub struct Switch {} #[allow(unused_variables)] impl InnerProtocolLayer for Switch { - fn should_respond_to(&self, id: &zerotier_crypto::typestate::Valid) -> bool { - true - } - - fn has_trust_relationship(&self, id: &zerotier_crypto::typestate::Valid) -> bool { - true - } - fn handle_packet( &self, app: &Application, diff --git a/network-hypervisor/src/vl2/topology.rs b/network-hypervisor/src/vl2/topology.rs index 3defd1233..6d451ac9e 100644 --- a/network-hypervisor/src/vl2/topology.rs +++ b/network-hypervisor/src/vl2/topology.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::io::{Read, Write}; use zerotier_utils::blob::Blob; use zerotier_utils::flatsortedmap::FlatSortedMap; @@ -20,6 +21,11 @@ pub struct Member<'a> { pub name: Cow<'a, str>, } +#[allow(unused)] +pub mod member_flag { + pub const BRIDGING_ALLOWED: u64 = 0x0001; +} + #[derive(Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct Topology<'a> { pub timestamp: i64, @@ -45,6 +51,16 @@ pub struct Topology<'a> { pub members: FlatSortedMap<'a, Blob, Member<'a>>, } +impl<'a> Topology<'a> { + pub fn new_from_bytes(b: &[u8]) -> Result { + rmp_serde::from_slice(b) + } + + pub fn write_bytes_to(&self, w: &mut W) { + rmp_serde::encode::write_named(w, self).unwrap() + } +} + #[inline(always)] fn u64_zero(i: &u64) -> bool { *i == 0