Scatter/gather, move SG into VL2 since that is where it will be used, add an array chunker to utils::memory

This commit is contained in:
Adam Ierymenko 2023-03-14 10:27:16 -04:00
parent b3bd64504b
commit dd9f1cffe5
6 changed files with 55 additions and 44 deletions

View file

@ -1 +0,0 @@
pub mod scattergather;

View file

@ -1,17 +0,0 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use crate::protocol;
use crate::protocol::PacketBuffer;
use crate::vl1::*;
use zerotier_utils::buffer::OutOfBoundsError;
use zerotier_utils::sync::RMaybeWLockGuard;
pub struct ReplicationService {}
impl ReplicationService {
pub fn new() -> Self {
Self {}
}
}

View file

@ -4,7 +4,6 @@ pub const VERSION_MAJOR: u8 = 1;
pub const VERSION_MINOR: u8 = 99; pub const VERSION_MINOR: u8 = 99;
pub const VERSION_REVISION: u16 = 1; pub const VERSION_REVISION: u16 = 1;
pub mod dr;
#[allow(unused)] #[allow(unused)]
pub mod protocol; pub mod protocol;
pub mod vl1; pub mod vl1;

View file

@ -3,6 +3,7 @@
mod iproute; mod iproute;
mod multicastgroup; mod multicastgroup;
mod networkid; mod networkid;
mod scattergather;
mod switch; mod switch;
mod topology; mod topology;

View file

@ -1,17 +1,20 @@
use fastcdc::v2020;
use std::io::Write; use std::io::Write;
use fastcdc::v2020;
use zerotier_crypto::hash::{SHA384, SHA384_HASH_SIZE}; use zerotier_crypto::hash::{SHA384, SHA384_HASH_SIZE};
use zerotier_utils::error::{InvalidFormatError, InvalidParameterError}; use zerotier_utils::error::{InvalidFormatError, InvalidParameterError};
use zerotier_utils::memory::byte_array_chunks_exact;
const MAX_RECURSION_DEPTH: usize = 64; const MAX_RECURSION_DEPTH: u8 = 64; // sanity limit, object would have to be quite huge to hit this
/// Recursively scatter/gather chunked object assembler. /// Re-assembler for scattered objects.
pub struct ObjectAssembler { pub struct ScatteredObject {
data_chunks: Vec<Vec<u8>>, data_chunks: Vec<Vec<u8>>,
need: Vec<u8>, need: Vec<u8>,
} }
impl ObjectAssembler { impl ScatteredObject {
/// Create a new assembler to gather an object given its root hash list. /// Create a new assembler to gather an object given its root hash list.
pub fn init(hash_list: Vec<u8>) -> Self { pub fn init(hash_list: Vec<u8>) -> Self {
Self { data_chunks: Vec::new(), need: hash_list } Self { data_chunks: Vec::new(), need: hash_list }
@ -22,15 +25,14 @@ impl ObjectAssembler {
new_hl: &mut Vec<u8>, new_hl: &mut Vec<u8>,
get_chunk: &mut GetChunk, get_chunk: &mut GetChunk,
have_all_data_chunk_hashes: &mut bool, have_all_data_chunk_hashes: &mut bool,
depth: usize, depth: u8,
) -> Result<(), InvalidFormatError> { ) -> Result<(), InvalidFormatError> {
if (hl.len() % SHA384_HASH_SIZE) != 0 || hl.is_empty() { if (hl.len() % SHA384_HASH_SIZE) != 0 || hl.is_empty() {
return Err(InvalidFormatError); return Err(InvalidFormatError);
} }
for h in hl.chunks_exact(SHA384_HASH_SIZE) { for h in byte_array_chunks_exact::<SHA384_HASH_SIZE>(hl) {
if (h[SHA384_HASH_SIZE - 1] & 0x01) != 0 { if (h[SHA384_HASH_SIZE - 1] & 0x01) != 0 {
debug_assert_eq!(h.len(), SHA384_HASH_SIZE); if let Some(chunk) = get_chunk(h) {
if let Some(chunk) = get_chunk(unsafe { &*h.as_ptr().cast() }) {
if depth < MAX_RECURSION_DEPTH { if depth < MAX_RECURSION_DEPTH {
Self::gather_recursive(chunk.as_slice(), new_hl, get_chunk, have_all_data_chunk_hashes, depth + 1)?; Self::gather_recursive(chunk.as_slice(), new_hl, get_chunk, have_all_data_chunk_hashes, depth + 1)?;
continue; continue;
@ -45,31 +47,36 @@ impl ObjectAssembler {
return Ok(()); return Ok(());
} }
/// Try to assemble this object, using the supplied function to request chunks we don't have. /// Try to assemble this object using the supplied function to request chunks we don't have.
/// ///
/// Once all chunks are retrieved this will return Ok(Some(object)). A return of Ok(None) means there are /// Once all chunks are retrieved this will return Ok(Some(object)). A return of Ok(None) means there are
/// still missing chunks that couldn't be resolved with the supplied getter. In that case this should be /// still missing chunks that couldn't be resolved with the supplied getter. In that case this should be
/// called again once more chunks are fetched. An error return indicates invalid chunk data or that the /// called again once more chunks are fetched. Use need() to get an iterator over chunks that are still
/// maximum recursion depth has been exceeded. /// outstanding.
///
/// Once a result is returned this assembler object should be discarded. An error return indicates an
/// invalid object due to either invalid chunk data or too many recursions.
pub fn gather<GetChunk: FnMut(&[u8; SHA384_HASH_SIZE]) -> Option<Vec<u8>>>( pub fn gather<GetChunk: FnMut(&[u8; SHA384_HASH_SIZE]) -> Option<Vec<u8>>>(
&mut self, &mut self,
mut get_chunk: GetChunk, mut get_chunk: GetChunk,
) -> Result<Option<Vec<u8>>, InvalidFormatError> { ) -> Result<Option<Vec<u8>>, InvalidFormatError> {
// First attempt to resolve all chunks of hashes until we have a complete in-order list of all data chunks.
let mut new_need = Vec::with_capacity(self.need.len()); let mut new_need = Vec::with_capacity(self.need.len());
let mut have_all_data_chunk_hashes = true; let mut have_all_data_chunk_hashes = true; // set to false if we still need to get chunks of hashes
Self::gather_recursive(self.need.as_slice(), &mut new_need, &mut get_chunk, &mut have_all_data_chunk_hashes, 0)?; Self::gather_recursive(self.need.as_slice(), &mut new_need, &mut get_chunk, &mut have_all_data_chunk_hashes, 0)?;
std::mem::swap(&mut self.need, &mut new_need); std::mem::swap(&mut self.need, &mut new_need);
// Once we have all data chunks, resolve those until the entire object is re-assembled.
if have_all_data_chunk_hashes { if have_all_data_chunk_hashes {
self.data_chunks.resize(self.need.len() / SHA384_HASH_SIZE, Vec::new()); self.data_chunks.resize(self.need.len() / SHA384_HASH_SIZE, Vec::new());
let mut cn = 0; let mut chunk_no = 0;
let mut missing_chunks = false; let mut missing_chunks = false;
for h in self.need.chunks_exact(SHA384_HASH_SIZE) { for h in byte_array_chunks_exact::<SHA384_HASH_SIZE>(self.need.as_slice()) {
let dc = self.data_chunks.get_mut(cn).unwrap(); let dc = self.data_chunks.get_mut(chunk_no).unwrap();
if dc.is_empty() { if dc.is_empty() {
debug_assert_eq!(h.len(), SHA384_HASH_SIZE); debug_assert_eq!(h.len(), SHA384_HASH_SIZE);
if let Some(chunk) = get_chunk(unsafe { &*h.as_ptr().cast() }) { if let Some(chunk) = get_chunk(h) {
if !chunk.is_empty() { if !chunk.is_empty() {
*dc = chunk; *dc = chunk;
} else { } else {
@ -79,7 +86,7 @@ impl ObjectAssembler {
missing_chunks = true; missing_chunks = true;
} }
} }
cn += 1; chunk_no += 1;
} }
if !missing_chunks { if !missing_chunks {
@ -99,9 +106,11 @@ impl ObjectAssembler {
} }
/// Get an iterator of hashes currently known to be needed to reassemble this object. /// Get an iterator of hashes currently known to be needed to reassemble this object.
#[inline] ///
/// This list can get longer through the course of object retrival since incoming chunks can
/// be chunks of hashes instead of chunks of data.
pub fn need(&self) -> impl Iterator<Item = &[u8; SHA384_HASH_SIZE]> { pub fn need(&self) -> impl Iterator<Item = &[u8; SHA384_HASH_SIZE]> {
self.need.chunks_exact(SHA384_HASH_SIZE).map(|c| unsafe { &*c.as_ptr().cast() }) byte_array_chunks_exact::<SHA384_HASH_SIZE>(self.need.as_slice())
} }
} }
@ -109,9 +118,9 @@ impl ObjectAssembler {
/// ///
/// This splits the supplied binary object into chunks using the FastCDC2020 content defined chunking /// This splits the supplied binary object into chunks using the FastCDC2020 content defined chunking
/// algorithm. For each chunk a SHA384 hash is computed and added to a hash list. If the resulting /// algorithm. For each chunk a SHA384 hash is computed and added to a hash list. If the resulting
/// hash list is larger than max_chunk_size it is further chunked in a simple deterministic way to /// hash list is larger than max_chunk_size it is recurisvely chunked into chunks of hashes. Chunking
/// yield hashes that point to further lists of hashes. The least significant bit in each hash is /// of the hash list is done deterministically rather than using FastCDC since the record size in these
/// set to 0 if the hash points to a chunk of data or 1 if it points to a chunk of hashes. /// chunks is always a multiple of the hash size.
/// ///
/// The supplied function is called to output each chunk except for the root hash list, which is /// The supplied function is called to output each chunk except for the root hash list, which is
/// returned. It's technically possible for the same chunk to be output more than once if there are /// returned. It's technically possible for the same chunk to be output more than once if there are
@ -184,7 +193,7 @@ mod tests {
use std::collections::HashMap; use std::collections::HashMap;
#[test] #[test]
fn rcdcb_random_blobs() { fn scatter_gather_random_blobs() {
let mut random_data = Vec::new(); let mut random_data = Vec::new();
random_data.resize(1024 * 1024 * 8, 0); random_data.resize(1024 * 1024 * 8, 0);
zerotier_crypto::random::fill_bytes_secure(random_data.as_mut()); zerotier_crypto::random::fill_bytes_secure(random_data.as_mut());
@ -201,7 +210,7 @@ mod tests {
}) })
.unwrap(); .unwrap();
let mut assembler = ObjectAssembler::init(root_hash_list); let mut assembler = ScatteredObject::init(root_hash_list);
let mut gathered_blob; let mut gathered_blob;
loop { loop {
gathered_blob = assembler gathered_blob = assembler

View file

@ -6,6 +6,9 @@
* https://www.zerotier.com/ * https://www.zerotier.com/
*/ */
// This is a collection of functions that use "unsafe" to do things with memory that should in fact
// be safe. Some of these may eventually get stable standard library replacements.
#[allow(unused_imports)] #[allow(unused_imports)]
use std::mem::{needs_drop, size_of, MaybeUninit}; use std::mem::{needs_drop, size_of, MaybeUninit};
@ -56,6 +59,23 @@ pub fn load_raw<T: Copy>(src: &[u8]) -> T {
} }
} }
/// Our version of the not-yet-stable array_chunks method in slice, but only for byte arrays.
#[inline(always)]
pub fn byte_array_chunks_exact<const S: usize>(a: &[u8]) -> impl Iterator<Item = &[u8; S]> {
let mut i = 0;
let l = a.len();
std::iter::from_fn(move || {
let j = i + S;
if j <= l {
let next = unsafe { &*a.as_ptr().add(i).cast() };
i = j;
Some(next)
} else {
None
}
})
}
/// Obtain a view into an array cast as another array. /// Obtain a view into an array cast as another array.
/// This will panic if the template parameters would result in out of bounds access. /// This will panic if the template parameters would result in out of bounds access.
#[inline(always)] #[inline(always)]