mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-06 20:43:44 +02:00
The scatter/gather algorithm works.
This commit is contained in:
parent
cd6b9fa616
commit
1afbc73ff8
2 changed files with 71 additions and 29 deletions
|
@ -1 +1 @@
|
||||||
pub mod rcdcb;
|
pub mod scattergather;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use fastcdc::v2020::{FastCDC, MINIMUM_MIN};
|
use fastcdc::v2020;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use zerotier_crypto::hash::{SHA384, SHA384_HASH_SIZE};
|
use zerotier_crypto::hash::{SHA384, SHA384_HASH_SIZE};
|
||||||
use zerotier_utils::error::{InvalidFormatError, InvalidParameterError};
|
use zerotier_utils::error::{InvalidFormatError, InvalidParameterError};
|
||||||
|
@ -17,19 +17,20 @@ impl ObjectAssembler {
|
||||||
Self { data_chunks: Vec::new(), need: hash_list }
|
Self { data_chunks: Vec::new(), need: hash_list }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn gather_recursive<GetChunk: FnMut(&[u8]) -> Option<Vec<u8>>>(
|
fn gather_recursive<GetChunk: FnMut(&[u8; SHA384_HASH_SIZE]) -> Option<Vec<u8>>>(
|
||||||
hl: &[u8],
|
hl: &[u8],
|
||||||
new_hl: &mut Vec<u8>,
|
new_hl: &mut Vec<u8>,
|
||||||
get_chunk: &mut GetChunk,
|
get_chunk: &mut GetChunk,
|
||||||
have_all_data_chunk_hashes: &mut bool,
|
have_all_data_chunk_hashes: &mut bool,
|
||||||
depth: usize,
|
depth: usize,
|
||||||
) -> Result<(), InvalidFormatError> {
|
) -> Result<(), InvalidFormatError> {
|
||||||
if (hl.len() % SHA384_HASH_SIZE) != 0 {
|
if (hl.len() % SHA384_HASH_SIZE) != 0 || hl.is_empty() {
|
||||||
return Err(InvalidFormatError);
|
return Err(InvalidFormatError);
|
||||||
}
|
}
|
||||||
for h in hl.chunks_exact(SHA384_HASH_SIZE) {
|
for h in hl.chunks_exact(SHA384_HASH_SIZE) {
|
||||||
if (h[SHA384_HASH_SIZE - 1] & 0x01) != 0 {
|
if (h[SHA384_HASH_SIZE - 1] & 0x01) != 0 {
|
||||||
if let Some(chunk) = get_chunk(h) {
|
debug_assert_eq!(h.len(), SHA384_HASH_SIZE);
|
||||||
|
if let Some(chunk) = get_chunk(unsafe { &*h.as_ptr().cast() }) {
|
||||||
if depth < MAX_RECURSION_DEPTH {
|
if depth < MAX_RECURSION_DEPTH {
|
||||||
Self::gather_recursive(chunk.as_slice(), new_hl, get_chunk, have_all_data_chunk_hashes, depth + 1)?;
|
Self::gather_recursive(chunk.as_slice(), new_hl, get_chunk, have_all_data_chunk_hashes, depth + 1)?;
|
||||||
continue;
|
continue;
|
||||||
|
@ -46,9 +47,14 @@ impl ObjectAssembler {
|
||||||
|
|
||||||
/// Try to assemble this object, using the supplied function to request chunks we don't have.
|
/// Try to assemble this object, using the supplied function to request chunks we don't have.
|
||||||
///
|
///
|
||||||
/// Once all chunks are retrieved this will return Ok(Some(object)). An error return can occur if a chunk
|
/// Once all chunks are retrieved this will return Ok(Some(object)). A return of Ok(None) means there are
|
||||||
/// is invalid or the maximum recursion depth is reached.
|
/// still missing chunks that couldn't be resolved with the supplied getter. In that case this should be
|
||||||
pub fn gather<GetChunk: FnMut(&[u8]) -> Option<Vec<u8>>>(&mut self, mut get_chunk: GetChunk) -> Result<Option<Vec<u8>>, InvalidFormatError> {
|
/// called again once more chunks are fetched. An error return indicates invalid chunk data or that the
|
||||||
|
/// maximum recursion depth has been exceeded.
|
||||||
|
pub fn gather<GetChunk: FnMut(&[u8; SHA384_HASH_SIZE]) -> Option<Vec<u8>>>(
|
||||||
|
&mut self,
|
||||||
|
mut get_chunk: GetChunk,
|
||||||
|
) -> Result<Option<Vec<u8>>, InvalidFormatError> {
|
||||||
let mut new_need = Vec::with_capacity(self.need.len());
|
let mut new_need = Vec::with_capacity(self.need.len());
|
||||||
let mut have_all_data_chunk_hashes = true;
|
let mut have_all_data_chunk_hashes = true;
|
||||||
Self::gather_recursive(self.need.as_slice(), &mut new_need, &mut get_chunk, &mut have_all_data_chunk_hashes, 0)?;
|
Self::gather_recursive(self.need.as_slice(), &mut new_need, &mut get_chunk, &mut have_all_data_chunk_hashes, 0)?;
|
||||||
|
@ -57,19 +63,26 @@ impl ObjectAssembler {
|
||||||
if have_all_data_chunk_hashes {
|
if have_all_data_chunk_hashes {
|
||||||
self.data_chunks.resize(self.need.len() / SHA384_HASH_SIZE, Vec::new());
|
self.data_chunks.resize(self.need.len() / SHA384_HASH_SIZE, Vec::new());
|
||||||
|
|
||||||
new_need.clear();
|
|
||||||
let mut cn = 0;
|
let mut cn = 0;
|
||||||
|
let mut missing_chunks = false;
|
||||||
for h in self.need.chunks_exact(SHA384_HASH_SIZE) {
|
for h in self.need.chunks_exact(SHA384_HASH_SIZE) {
|
||||||
if let Some(chunk) = get_chunk(h) {
|
let dc = self.data_chunks.get_mut(cn).unwrap();
|
||||||
*self.data_chunks.get_mut(cn).unwrap() = chunk;
|
if dc.is_empty() {
|
||||||
|
debug_assert_eq!(h.len(), SHA384_HASH_SIZE);
|
||||||
|
if let Some(chunk) = get_chunk(unsafe { &*h.as_ptr().cast() }) {
|
||||||
|
if !chunk.is_empty() {
|
||||||
|
*dc = chunk;
|
||||||
} else {
|
} else {
|
||||||
let _ = new_need.write_all(h);
|
return Err(InvalidFormatError);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
missing_chunks = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cn += 1;
|
cn += 1;
|
||||||
}
|
}
|
||||||
self.need = new_need;
|
|
||||||
|
|
||||||
if self.need.is_empty() {
|
if !missing_chunks {
|
||||||
let mut obj_size = 0;
|
let mut obj_size = 0;
|
||||||
for dc in self.data_chunks.iter() {
|
for dc in self.data_chunks.iter() {
|
||||||
obj_size += dc.len();
|
obj_size += dc.len();
|
||||||
|
@ -84,6 +97,12 @@ impl ObjectAssembler {
|
||||||
|
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get an iterator of hashes currently known to be needed to reassemble this object.
|
||||||
|
#[inline]
|
||||||
|
pub fn need(&self) -> impl Iterator<Item = &[u8; SHA384_HASH_SIZE]> {
|
||||||
|
self.need.chunks_exact(SHA384_HASH_SIZE).map(|c| unsafe { &*c.as_ptr().cast() })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decompose an object into a series of chunks identified by SHA384 hashes.
|
/// Decompose an object into a series of chunks identified by SHA384 hashes.
|
||||||
|
@ -95,7 +114,8 @@ impl ObjectAssembler {
|
||||||
/// set to 0 if the hash points to a chunk of data or 1 if it points to a chunk of hashes.
|
/// set to 0 if the hash points to a chunk of data or 1 if it points to a chunk of hashes.
|
||||||
///
|
///
|
||||||
/// The supplied function is called to output each chunk except for the root hash list, which is
|
/// The supplied function is called to output each chunk except for the root hash list, which is
|
||||||
/// returned.
|
/// returned. It's technically possible for the same chunk to be output more than once if there are
|
||||||
|
/// long runs of identical data in the supplied object. In this case it need only be stored once.
|
||||||
///
|
///
|
||||||
/// * `obj` - Blob to decompose
|
/// * `obj` - Blob to decompose
|
||||||
/// * `max_chunk_size` - Maximum size of any chunk including root hash list (minimum allowed: 256)
|
/// * `max_chunk_size` - Maximum size of any chunk including root hash list (minimum allowed: 256)
|
||||||
|
@ -110,7 +130,7 @@ pub fn scatter<F: FnMut([u8; SHA384_HASH_SIZE], &[u8])>(
|
||||||
}
|
}
|
||||||
let mut root_hash_list = Vec::with_capacity(max_chunk_size as usize);
|
let mut root_hash_list = Vec::with_capacity(max_chunk_size as usize);
|
||||||
|
|
||||||
for chunk in FastCDC::new(obj, (max_chunk_size / 4).max(MINIMUM_MIN), max_chunk_size / 2, max_chunk_size) {
|
for chunk in v2020::FastCDC::new(obj, (max_chunk_size / 4).max(v2020::MINIMUM_MIN), max_chunk_size / 2, max_chunk_size) {
|
||||||
let chunk = &obj[chunk.offset..chunk.offset + chunk.length];
|
let chunk = &obj[chunk.offset..chunk.offset + chunk.length];
|
||||||
let mut chunk_hash = SHA384::hash(chunk);
|
let mut chunk_hash = SHA384::hash(chunk);
|
||||||
chunk_hash[SHA384_HASH_SIZE - 1] &= 0xfe; // chunk of data
|
chunk_hash[SHA384_HASH_SIZE - 1] &= 0xfe; // chunk of data
|
||||||
|
@ -125,17 +145,20 @@ pub fn scatter<F: FnMut([u8; SHA384_HASH_SIZE], &[u8])>(
|
||||||
loop {
|
loop {
|
||||||
let mut r = root_hash_list.as_slice();
|
let mut r = root_hash_list.as_slice();
|
||||||
while !r.is_empty() {
|
while !r.is_empty() {
|
||||||
|
debug_assert_eq!(new_root_hash_list.len() % SHA384_HASH_SIZE, 0);
|
||||||
|
debug_assert_eq!(r.len() % SHA384_HASH_SIZE, 0);
|
||||||
|
if (new_root_hash_list.len() + r.len()) <= (max_chunk_size as usize) {
|
||||||
|
let _ = new_root_hash_list.write_all(r);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
let clen = r.len().min(max_hashes_per_chunk);
|
let clen = r.len().min(max_hashes_per_chunk);
|
||||||
let chunk = &r[..clen];
|
let chunk = &r[..clen];
|
||||||
if clen > SHA384_HASH_SIZE && (new_root_hash_list.len() + clen) > (max_chunk_size as usize) {
|
r = &r[clen..];
|
||||||
|
|
||||||
let mut chunk_hash = SHA384::hash(chunk);
|
let mut chunk_hash = SHA384::hash(chunk);
|
||||||
chunk_hash[SHA384_HASH_SIZE - 1] |= 0x01; // chunk of hashes
|
chunk_hash[SHA384_HASH_SIZE - 1] |= 0x01; // chunk of hashes
|
||||||
let _ = new_root_hash_list.write_all(&chunk_hash);
|
let _ = new_root_hash_list.write_all(&chunk_hash);
|
||||||
store_chunk(chunk_hash, chunk);
|
store_chunk(chunk_hash, chunk);
|
||||||
r = &r[clen..];
|
|
||||||
} else {
|
|
||||||
let _ = new_root_hash_list.write_all(chunk);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::mem::swap(&mut root_hash_list, &mut new_root_hash_list);
|
std::mem::swap(&mut root_hash_list, &mut new_root_hash_list);
|
||||||
|
@ -159,16 +182,15 @@ pub fn scatter<F: FnMut([u8; SHA384_HASH_SIZE], &[u8])>(
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use zerotier_utils::hex;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn rcdcb_random_blobs() {
|
fn rcdcb_random_blobs() {
|
||||||
let mut random_data = Vec::new();
|
let mut random_data = Vec::new();
|
||||||
random_data.resize(1024 * 1024 * 32, 0);
|
random_data.resize(1024 * 1024 * 8, 0);
|
||||||
zerotier_crypto::random::fill_bytes_secure(random_data.as_mut());
|
zerotier_crypto::random::fill_bytes_secure(random_data.as_mut());
|
||||||
|
|
||||||
let mut chunks = HashMap::new();
|
let mut chunks = HashMap::new();
|
||||||
for _ in 0..1024 {
|
for _ in 0..4 {
|
||||||
chunks.clear();
|
chunks.clear();
|
||||||
let test_blob = ((zerotier_crypto::random::xorshift64_random() as usize) % (random_data.len() - 1)) + 1;
|
let test_blob = ((zerotier_crypto::random::xorshift64_random() as usize) % (random_data.len() - 1)) + 1;
|
||||||
let test_blob = &random_data.as_slice()[..test_blob];
|
let test_blob = &random_data.as_slice()[..test_blob];
|
||||||
|
@ -176,8 +198,28 @@ mod tests {
|
||||||
let root_hash_list = scatter(test_blob, 1024, |k, v| {
|
let root_hash_list = scatter(test_blob, 1024, |k, v| {
|
||||||
//println!("{}", hex::to_string(&k));
|
//println!("{}", hex::to_string(&k));
|
||||||
chunks.insert(k, v.to_vec());
|
chunks.insert(k, v.to_vec());
|
||||||
});
|
})
|
||||||
println!("{} chunks", chunks.len());
|
.unwrap();
|
||||||
|
|
||||||
|
let mut assembler = ObjectAssembler::init(root_hash_list);
|
||||||
|
let mut gathered_blob;
|
||||||
|
loop {
|
||||||
|
gathered_blob = assembler
|
||||||
|
.gather(|c| {
|
||||||
|
if zerotier_crypto::random::xorshift64_random() < (u64::MAX / 8) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
chunks.get(c).cloned()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
if gathered_blob.is_some() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let gathered_blob = gathered_blob.unwrap();
|
||||||
|
|
||||||
|
assert!(gathered_blob.eq(test_blob));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Add table
Reference in a new issue