mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-06 20:43:44 +02:00
scatter gather stuff
This commit is contained in:
parent
99ede32c96
commit
6ae7ebc186
10 changed files with 231 additions and 27 deletions
|
@ -504,14 +504,6 @@ impl InnerProtocolLayer for Controller {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
fn has_trust_relationship(&self, id: &Valid<Identity>) -> bool {
|
|
||||||
self.recently_authorized
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.get(&id.fingerprint)
|
|
||||||
.map_or(false, |by_network| by_network.values().any(|t| *t > ms_monotonic()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_packet<HostSystemImpl: ApplicationLayer + ?Sized>(
|
fn handle_packet<HostSystemImpl: ApplicationLayer + ?Sized>(
|
||||||
&self,
|
&self,
|
||||||
host_system: &HostSystemImpl,
|
host_system: &HostSystemImpl,
|
||||||
|
|
|
@ -16,6 +16,8 @@ lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-
|
||||||
serde = { version = "^1", features = ["derive"], default-features = false }
|
serde = { version = "^1", features = ["derive"], default-features = false }
|
||||||
phf = { version = "^0", features = ["macros", "std"], default-features = false }
|
phf = { version = "^0", features = ["macros", "std"], default-features = false }
|
||||||
num-traits = "^0"
|
num-traits = "^0"
|
||||||
|
rmp-serde = "^1"
|
||||||
|
fastcdc = "^3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rand = "*"
|
rand = "*"
|
||||||
|
|
1
network-hypervisor/src/dr/mod.rs
Normal file
1
network-hypervisor/src/dr/mod.rs
Normal file
|
@ -0,0 +1 @@
|
||||||
|
pub mod rcdcb;
|
183
network-hypervisor/src/dr/rcdcb.rs
Normal file
183
network-hypervisor/src/dr/rcdcb.rs
Normal file
|
@ -0,0 +1,183 @@
|
||||||
|
use fastcdc::v2020::{FastCDC, MINIMUM_MIN};
|
||||||
|
use std::io::Write;
|
||||||
|
use zerotier_crypto::hash::{SHA384, SHA384_HASH_SIZE};
|
||||||
|
use zerotier_utils::error::{InvalidFormatError, InvalidParameterError};
|
||||||
|
|
||||||
|
const MAX_RECURSION_DEPTH: usize = 64;
|
||||||
|
|
||||||
|
/// Recursively scatter/gather chunked object assembler.
|
||||||
|
pub struct ObjectAssembler {
|
||||||
|
data_chunks: Vec<Vec<u8>>,
|
||||||
|
need: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectAssembler {
|
||||||
|
/// Create a new assembler to gather an object given its root hash list.
|
||||||
|
pub fn init(hash_list: Vec<u8>) -> Self {
|
||||||
|
Self { data_chunks: Vec::new(), need: hash_list }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gather_recursive<GetChunk: FnMut(&[u8]) -> Option<Vec<u8>>>(
|
||||||
|
hl: &[u8],
|
||||||
|
new_hl: &mut Vec<u8>,
|
||||||
|
get_chunk: &mut GetChunk,
|
||||||
|
have_all_data_chunk_hashes: &mut bool,
|
||||||
|
depth: usize,
|
||||||
|
) -> Result<(), InvalidFormatError> {
|
||||||
|
if (hl.len() % SHA384_HASH_SIZE) != 0 {
|
||||||
|
return Err(InvalidFormatError);
|
||||||
|
}
|
||||||
|
for h in hl.chunks_exact(SHA384_HASH_SIZE) {
|
||||||
|
if (h[SHA384_HASH_SIZE - 1] & 0x01) != 0 {
|
||||||
|
if let Some(chunk) = get_chunk(h) {
|
||||||
|
if depth < MAX_RECURSION_DEPTH {
|
||||||
|
Self::gather_recursive(chunk.as_slice(), new_hl, get_chunk, have_all_data_chunk_hashes, depth + 1)?;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return Err(InvalidFormatError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*have_all_data_chunk_hashes = false;
|
||||||
|
}
|
||||||
|
let _ = new_hl.write_all(h);
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to assemble this object, using the supplied function to request chunks we don't have.
|
||||||
|
///
|
||||||
|
/// Once all chunks are retrieved this will return Ok(Some(object)). An error return can occur if a chunk
|
||||||
|
/// is invalid or the maximum recursion depth is reached.
|
||||||
|
pub fn gather<GetChunk: FnMut(&[u8]) -> Option<Vec<u8>>>(&mut self, mut get_chunk: GetChunk) -> Result<Option<Vec<u8>>, InvalidFormatError> {
|
||||||
|
let mut new_need = Vec::with_capacity(self.need.len());
|
||||||
|
let mut have_all_data_chunk_hashes = true;
|
||||||
|
Self::gather_recursive(self.need.as_slice(), &mut new_need, &mut get_chunk, &mut have_all_data_chunk_hashes, 0)?;
|
||||||
|
std::mem::swap(&mut self.need, &mut new_need);
|
||||||
|
|
||||||
|
if have_all_data_chunk_hashes {
|
||||||
|
self.data_chunks.resize(self.need.len() / SHA384_HASH_SIZE, Vec::new());
|
||||||
|
|
||||||
|
new_need.clear();
|
||||||
|
let mut cn = 0;
|
||||||
|
for h in self.need.chunks_exact(SHA384_HASH_SIZE) {
|
||||||
|
if let Some(chunk) = get_chunk(h) {
|
||||||
|
*self.data_chunks.get_mut(cn).unwrap() = chunk;
|
||||||
|
} else {
|
||||||
|
let _ = new_need.write_all(h);
|
||||||
|
}
|
||||||
|
cn += 1;
|
||||||
|
}
|
||||||
|
self.need = new_need;
|
||||||
|
|
||||||
|
if self.need.is_empty() {
|
||||||
|
let mut obj_size = 0;
|
||||||
|
for dc in self.data_chunks.iter() {
|
||||||
|
obj_size += dc.len();
|
||||||
|
}
|
||||||
|
let mut obj = Vec::with_capacity(obj_size);
|
||||||
|
for dc in self.data_chunks.iter() {
|
||||||
|
let _ = obj.write_all(dc.as_slice());
|
||||||
|
}
|
||||||
|
return Ok(Some(obj));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decompose an object into a series of chunks identified by SHA384 hashes.
|
||||||
|
///
|
||||||
|
/// This splits the supplied binary object into chunks using the FastCDC2020 content defined chunking
|
||||||
|
/// algorithm. For each chunk a SHA384 hash is computed and added to a hash list. If the resulting
|
||||||
|
/// hash list is larger than max_chunk_size it is further chunked in a simple deterministic way to
|
||||||
|
/// yield hashes that point to further lists of hashes. The least significant bit in each hash is
|
||||||
|
/// set to 0 if the hash points to a chunk of data or 1 if it points to a chunk of hashes.
|
||||||
|
///
|
||||||
|
/// The supplied function is called to output each chunk except for the root hash list, which is
|
||||||
|
/// returned.
|
||||||
|
///
|
||||||
|
/// * `obj` - Blob to decompose
|
||||||
|
/// * `max_chunk_size` - Maximum size of any chunk including root hash list (minimum allowed: 256)
|
||||||
|
/// * `store_chunk` - Function that is called to store each chunk other than the root hash list
|
||||||
|
pub fn scatter<F: FnMut([u8; SHA384_HASH_SIZE], &[u8])>(
|
||||||
|
obj: &[u8],
|
||||||
|
max_chunk_size: u32,
|
||||||
|
mut store_chunk: F,
|
||||||
|
) -> Result<Vec<u8>, InvalidParameterError> {
|
||||||
|
if max_chunk_size < 512 {
|
||||||
|
return Err(InvalidParameterError("max chunk size must be >= 512"));
|
||||||
|
}
|
||||||
|
let mut root_hash_list = Vec::with_capacity(max_chunk_size as usize);
|
||||||
|
|
||||||
|
for chunk in FastCDC::new(obj, (max_chunk_size / 4).max(MINIMUM_MIN), max_chunk_size / 2, max_chunk_size) {
|
||||||
|
let chunk = &obj[chunk.offset..chunk.offset + chunk.length];
|
||||||
|
let mut chunk_hash = SHA384::hash(chunk);
|
||||||
|
chunk_hash[SHA384_HASH_SIZE - 1] &= 0xfe; // chunk of data
|
||||||
|
let _ = root_hash_list.write_all(&chunk_hash);
|
||||||
|
store_chunk(chunk_hash, chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
if root_hash_list.len() > (max_chunk_size as usize) {
|
||||||
|
let max_hashes_per_chunk = ((max_chunk_size / (SHA384_HASH_SIZE as u32)) * (SHA384_HASH_SIZE as u32)) as usize;
|
||||||
|
let mut new_root_hash_list = Vec::with_capacity(max_chunk_size as usize);
|
||||||
|
let mut recursion_depth = 0;
|
||||||
|
loop {
|
||||||
|
let mut r = root_hash_list.as_slice();
|
||||||
|
while !r.is_empty() {
|
||||||
|
let clen = r.len().min(max_hashes_per_chunk);
|
||||||
|
let chunk = &r[..clen];
|
||||||
|
if clen > SHA384_HASH_SIZE && (new_root_hash_list.len() + clen) > (max_chunk_size as usize) {
|
||||||
|
let mut chunk_hash = SHA384::hash(chunk);
|
||||||
|
chunk_hash[SHA384_HASH_SIZE - 1] |= 0x01; // chunk of hashes
|
||||||
|
let _ = new_root_hash_list.write_all(&chunk_hash);
|
||||||
|
store_chunk(chunk_hash, chunk);
|
||||||
|
r = &r[clen..];
|
||||||
|
} else {
|
||||||
|
let _ = new_root_hash_list.write_all(chunk);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::mem::swap(&mut root_hash_list, &mut new_root_hash_list);
|
||||||
|
|
||||||
|
if root_hash_list.len() <= (max_chunk_size as usize) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
new_root_hash_list.clear();
|
||||||
|
if recursion_depth >= MAX_RECURSION_DEPTH {
|
||||||
|
return Err(InvalidParameterError("max recursion depth exceeded"));
|
||||||
|
}
|
||||||
|
recursion_depth += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(root_hash_list);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use zerotier_utils::hex;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rcdcb_random_blobs() {
|
||||||
|
let mut random_data = Vec::new();
|
||||||
|
random_data.resize(1024 * 1024 * 32, 0);
|
||||||
|
zerotier_crypto::random::fill_bytes_secure(random_data.as_mut());
|
||||||
|
|
||||||
|
let mut chunks = HashMap::new();
|
||||||
|
for _ in 0..1024 {
|
||||||
|
chunks.clear();
|
||||||
|
let test_blob = ((zerotier_crypto::random::xorshift64_random() as usize) % (random_data.len() - 1)) + 1;
|
||||||
|
let test_blob = &random_data.as_slice()[..test_blob];
|
||||||
|
|
||||||
|
let root_hash_list = scatter(test_blob, 1024, |k, v| {
|
||||||
|
//println!("{}", hex::to_string(&k));
|
||||||
|
chunks.insert(k, v.to_vec());
|
||||||
|
});
|
||||||
|
println!("{} chunks", chunks.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
17
network-hypervisor/src/dr/replicationservice.rs
Normal file
17
network-hypervisor/src/dr/replicationservice.rs
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
|
|
||||||
|
use crate::protocol;
|
||||||
|
use crate::protocol::PacketBuffer;
|
||||||
|
use crate::vl1::*;
|
||||||
|
|
||||||
|
use zerotier_utils::buffer::OutOfBoundsError;
|
||||||
|
use zerotier_utils::sync::RMaybeWLockGuard;
|
||||||
|
|
||||||
|
pub struct ReplicationService {}
|
||||||
|
|
||||||
|
impl ReplicationService {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,6 +4,7 @@ pub const VERSION_MAJOR: u8 = 1;
|
||||||
pub const VERSION_MINOR: u8 = 99;
|
pub const VERSION_MINOR: u8 = 99;
|
||||||
pub const VERSION_REVISION: u16 = 1;
|
pub const VERSION_REVISION: u16 = 1;
|
||||||
|
|
||||||
|
pub mod dr;
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
pub mod vl1;
|
pub mod vl1;
|
||||||
|
|
|
@ -75,6 +75,8 @@ pub type MessageId = u64;
|
||||||
|
|
||||||
/// ZeroTier VL1 and VL2 wire protocol message types.
|
/// ZeroTier VL1 and VL2 wire protocol message types.
|
||||||
pub mod message_type {
|
pub mod message_type {
|
||||||
|
// VL1: Virtual Layer 1, the peer to peer network
|
||||||
|
|
||||||
pub const VL1_NOP: u8 = 0x00;
|
pub const VL1_NOP: u8 = 0x00;
|
||||||
pub const VL1_HELLO: u8 = 0x01;
|
pub const VL1_HELLO: u8 = 0x01;
|
||||||
pub const VL1_ERROR: u8 = 0x02;
|
pub const VL1_ERROR: u8 = 0x02;
|
||||||
|
@ -85,12 +87,19 @@ pub mod message_type {
|
||||||
pub const VL1_PUSH_DIRECT_PATHS: u8 = 0x10;
|
pub const VL1_PUSH_DIRECT_PATHS: u8 = 0x10;
|
||||||
pub const VL1_USER_MESSAGE: u8 = 0x14;
|
pub const VL1_USER_MESSAGE: u8 = 0x14;
|
||||||
|
|
||||||
|
// VL2: Virtual Layer 2, the virtual Ethernet network
|
||||||
|
|
||||||
pub const VL2_MULTICAST_LIKE: u8 = 0x09;
|
pub const VL2_MULTICAST_LIKE: u8 = 0x09;
|
||||||
pub const VL2_NETWORK_CREDENTIALS: u8 = 0x0a;
|
pub const VL2_NETWORK_CREDENTIALS: u8 = 0x0a;
|
||||||
pub const VL2_NETWORK_CONFIG_REQUEST: u8 = 0x0b;
|
pub const VL2_NETWORK_CONFIG_REQUEST: u8 = 0x0b;
|
||||||
pub const VL2_NETWORK_CONFIG: u8 = 0x0c;
|
pub const VL2_NETWORK_CONFIG: u8 = 0x0c;
|
||||||
pub const VL2_MULTICAST_GATHER: u8 = 0x0d;
|
pub const VL2_MULTICAST_GATHER: u8 = 0x0d;
|
||||||
|
|
||||||
|
// DR: Data replication protocol (scatter-gather based)
|
||||||
|
|
||||||
|
pub const DR_REQUEST: u8 = 0x1e;
|
||||||
|
pub const DR_DATA: u8 = 0x1f;
|
||||||
|
|
||||||
pub fn name(verb: u8) -> &'static str {
|
pub fn name(verb: u8) -> &'static str {
|
||||||
match verb {
|
match verb {
|
||||||
VL1_NOP => "VL1_NOP",
|
VL1_NOP => "VL1_NOP",
|
||||||
|
@ -106,6 +115,8 @@ pub mod message_type {
|
||||||
VL2_NETWORK_CONFIG_REQUEST => "VL2_NETWORK_CONFIG_REQUEST",
|
VL2_NETWORK_CONFIG_REQUEST => "VL2_NETWORK_CONFIG_REQUEST",
|
||||||
VL2_NETWORK_CONFIG => "VL2_NETWORK_CONFIG",
|
VL2_NETWORK_CONFIG => "VL2_NETWORK_CONFIG",
|
||||||
VL2_MULTICAST_GATHER => "VL2_MULTICAST_GATHER",
|
VL2_MULTICAST_GATHER => "VL2_MULTICAST_GATHER",
|
||||||
|
DR_REQUEST => "DR_REQUEST",
|
||||||
|
DR_DATA => "DR_DATA",
|
||||||
_ => "???",
|
_ => "???",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -137,17 +137,6 @@ pub trait InnerProtocolLayer: Sync + Send {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if this node has any trust relationship with the provided identity.
|
|
||||||
///
|
|
||||||
/// This should return true if there is any special trust relationship. It controls things
|
|
||||||
/// like sharing of detailed P2P connectivity data, which should be limited to peers with
|
|
||||||
/// some privileged relationship like mutual membership in a network.
|
|
||||||
///
|
|
||||||
/// The default implementation always returns true.
|
|
||||||
fn has_trust_relationship(&self, id: &Valid<Identity>) -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handle a packet, returning true if it was handled by the next layer.
|
/// Handle a packet, returning true if it was handled by the next layer.
|
||||||
///
|
///
|
||||||
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
|
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
|
||||||
|
|
|
@ -11,14 +11,6 @@ pub struct Switch {}
|
||||||
|
|
||||||
#[allow(unused_variables)]
|
#[allow(unused_variables)]
|
||||||
impl InnerProtocolLayer for Switch {
|
impl InnerProtocolLayer for Switch {
|
||||||
fn should_respond_to(&self, id: &zerotier_crypto::typestate::Valid<crate::vl1::Identity>) -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn has_trust_relationship(&self, id: &zerotier_crypto::typestate::Valid<crate::vl1::Identity>) -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_packet<Application: ApplicationLayer + ?Sized>(
|
fn handle_packet<Application: ApplicationLayer + ?Sized>(
|
||||||
&self,
|
&self,
|
||||||
app: &Application,
|
app: &Application,
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
|
||||||
use zerotier_utils::blob::Blob;
|
use zerotier_utils::blob::Blob;
|
||||||
use zerotier_utils::flatsortedmap::FlatSortedMap;
|
use zerotier_utils::flatsortedmap::FlatSortedMap;
|
||||||
|
@ -20,6 +21,11 @@ pub struct Member<'a> {
|
||||||
pub name: Cow<'a, str>,
|
pub name: Cow<'a, str>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(unused)]
|
||||||
|
pub mod member_flag {
|
||||||
|
pub const BRIDGING_ALLOWED: u64 = 0x0001;
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone)]
|
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone)]
|
||||||
pub struct Topology<'a> {
|
pub struct Topology<'a> {
|
||||||
pub timestamp: i64,
|
pub timestamp: i64,
|
||||||
|
@ -45,6 +51,16 @@ pub struct Topology<'a> {
|
||||||
pub members: FlatSortedMap<'a, Blob<IDENTITY_FINGERPRINT_SIZE>, Member<'a>>,
|
pub members: FlatSortedMap<'a, Blob<IDENTITY_FINGERPRINT_SIZE>, Member<'a>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'a> Topology<'a> {
|
||||||
|
pub fn new_from_bytes(b: &[u8]) -> Result<Self, rmp_serde::decode::Error> {
|
||||||
|
rmp_serde::from_slice(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_bytes_to<W: Write>(&self, w: &mut W) {
|
||||||
|
rmp_serde::encode::write_named(w, self).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn u64_zero(i: &u64) -> bool {
|
fn u64_zero(i: &u64) -> bool {
|
||||||
*i == 0
|
*i == 0
|
||||||
|
|
Loading…
Add table
Reference in a new issue