mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-05 20:13:44 +02:00
It compiles.
This commit is contained in:
parent
734e86dfed
commit
0d67fcee92
5 changed files with 258 additions and 172 deletions
|
@ -1,3 +1,11 @@
|
||||||
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* (c)2021 ZeroTier, Inc.
|
||||||
|
* https://www.zerotier.com/
|
||||||
|
*/
|
||||||
|
|
||||||
mod store;
|
mod store;
|
||||||
mod replicator;
|
mod replicator;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
|
@ -10,3 +18,8 @@ pub(crate) fn ms_since_epoch() -> u64 {
|
||||||
pub(crate) fn ms_monotonic() -> u64 {
|
pub(crate) fn ms_monotonic() -> u64 {
|
||||||
std::time::Instant::now().elapsed().as_millis() as u64
|
std::time::Instant::now().elapsed().as_millis() as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const IDENTITY_HASH_SIZE: usize = 48;
|
||||||
|
|
||||||
|
pub use store::{Store, StoreObjectResult};
|
||||||
|
pub use replicator::{Replicator, Config};
|
||||||
|
|
|
@ -1,15 +1,17 @@
|
||||||
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* (c)2021 ZeroTier, Inc.
|
||||||
|
* https://www.zerotier.com/
|
||||||
|
*/
|
||||||
|
|
||||||
pub(crate) const PROTOCOL_VERSION: u8 = 1;
|
pub(crate) const PROTOCOL_VERSION: u8 = 1;
|
||||||
|
|
||||||
/// No operation and no payload, sent as a heartbeat.
|
|
||||||
/// This is the only message type NOT followed by a message size varint. It's just one byte.
|
|
||||||
pub(crate) const MESSAGE_TYPE_NOP: u8 = 0;
|
pub(crate) const MESSAGE_TYPE_NOP: u8 = 0;
|
||||||
|
pub(crate) const MESSAGE_TYPE_HAVE_NEW_OBJECT: u8 = 1;
|
||||||
/// An object either sent in response to a query or because it is new.
|
pub(crate) const MESSAGE_TYPE_OBJECT: u8 = 2;
|
||||||
/// Payload is simply the object. The hash is not included as we compute it locally for security.
|
pub(crate) const MESSAGE_TYPE_GET_OBJECTS: u8 = 3;
|
||||||
pub(crate) const MESSAGE_TYPE_OBJECT: u8 = 1;
|
|
||||||
|
|
||||||
/// Request one or more objects by identity hash with optional common prefix.
|
|
||||||
pub(crate) const MESSAGE_TYPE_GET_OBJECTS: u8 = 2;
|
|
||||||
|
|
||||||
/// HELLO message, which is all u8's and is packed and so can be parsed directly in place.
|
/// HELLO message, which is all u8's and is packed and so can be parsed directly in place.
|
||||||
/// This message is sent at the start of any connection by both sides.
|
/// This message is sent at the start of any connection by both sides.
|
||||||
|
@ -20,7 +22,7 @@ pub(crate) struct Hello {
|
||||||
pub flags: [u8; 4], // u32, little endian
|
pub flags: [u8; 4], // u32, little endian
|
||||||
pub clock: [u8; 8], // u64, little endian
|
pub clock: [u8; 8], // u64, little endian
|
||||||
pub data_set_size: [u8; 8], // u64, little endian
|
pub data_set_size: [u8; 8], // u64, little endian
|
||||||
pub domain_hash: [u8; 32],
|
pub domain_hash: [u8; 48],
|
||||||
pub instance_id: [u8; 16],
|
pub instance_id: [u8; 16],
|
||||||
pub loopback_check_code_salt: [u8; 8],
|
pub loopback_check_code_salt: [u8; 8],
|
||||||
pub loopback_check_code: [u8; 16],
|
pub loopback_check_code: [u8; 16],
|
||||||
|
|
|
@ -1,30 +1,55 @@
|
||||||
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* (c)2021 ZeroTier, Inc.
|
||||||
|
* https://www.zerotier.com/
|
||||||
|
*/
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::future::Future;
|
use std::error::Error;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::io::Read;
|
use std::marker::PhantomData;
|
||||||
use std::mem::{size_of, transmute};
|
use std::mem::{size_of, transmute};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use getrandom::getrandom;
|
use getrandom::getrandom;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha384};
|
||||||
use sha2::digest::{FixedOutput, Reset, Update};
|
|
||||||
use smol::{Executor, Task, Timer};
|
use smol::{Executor, Task, Timer};
|
||||||
use smol::future;
|
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
use smol::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
|
||||||
use smol::lock::Mutex;
|
use smol::lock::Mutex;
|
||||||
use smol::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream, SocketAddr};
|
use smol::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream, SocketAddr};
|
||||||
use smol::stream::StreamExt;
|
use smol::stream::StreamExt;
|
||||||
|
|
||||||
use crate::{ms_monotonic, ms_since_epoch, protocol};
|
use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, protocol};
|
||||||
use crate::store::Store;
|
use crate::store::{StoreObjectResult, Store};
|
||||||
use crate::varint;
|
use crate::varint;
|
||||||
|
|
||||||
const CONNECTION_TIMEOUT_SECONDS: u64 = 30;
|
const CONNECTION_TIMEOUT_SECONDS: u64 = 60;
|
||||||
|
const CONNECTION_SYNC_RESTART_TIMEOUT_SECONDS: u64 = 5;
|
||||||
|
|
||||||
|
static mut XORSHIFT64_STATE: u64 = 0;
|
||||||
|
|
||||||
|
/// Get a non-cryptographic random number.
|
||||||
|
fn xorshift64_random() -> u64 {
|
||||||
|
let mut x = unsafe { XORSHIFT64_STATE };
|
||||||
|
x ^= x.wrapping_shl(13);
|
||||||
|
x ^= x.wrapping_shr(7);
|
||||||
|
x ^= x.wrapping_shl(17);
|
||||||
|
unsafe { XORSHIFT64_STATE = x };
|
||||||
|
x
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
|
/// Number of P2P connections desired.
|
||||||
|
pub target_link_count: usize,
|
||||||
|
|
||||||
|
/// Maximum allowed size of an object.
|
||||||
|
pub max_object_size: usize,
|
||||||
|
|
||||||
/// TCP port to which this should bind.
|
/// TCP port to which this should bind.
|
||||||
pub tcp_port: u16,
|
pub tcp_port: u16,
|
||||||
|
|
||||||
|
@ -41,39 +66,49 @@ struct ConnectionKey {
|
||||||
impl Hash for ConnectionKey {
|
impl Hash for ConnectionKey {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||||
state.write(&self.instance_id);
|
self.instance_id.hash(state);
|
||||||
self.ip.hash(state);
|
self.ip.hash(state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ReplicatorImpl<S: Store> {
|
struct Connection {
|
||||||
instance_id: [u8; 16],
|
remote_address: SocketAddr,
|
||||||
loopback_check_code_secret: [u8; 16],
|
last_receive: Arc<AtomicU64>,
|
||||||
domain_hash: [u8; 32],
|
task: Task<()>
|
||||||
store: Arc<S>,
|
|
||||||
config: Config,
|
|
||||||
connections: Mutex<HashMap<ConnectionKey, (SocketAddr, Task<()>)>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Replicator<S: Store> {
|
struct ReplicatorImpl<'ex> {
|
||||||
state: Arc<ReplicatorImpl<S>>,
|
executor: Arc<Executor<'ex>>,
|
||||||
|
instance_id: [u8; 16],
|
||||||
|
loopback_check_code_secret: [u8; 16],
|
||||||
|
domain_hash: [u8; 48],
|
||||||
|
store: Arc<dyn Store>,
|
||||||
|
config: Config,
|
||||||
|
connections: Mutex<HashMap<ConnectionKey, Connection>>,
|
||||||
|
announced_objects_requested: Mutex<HashMap<[u8; IDENTITY_HASH_SIZE], u64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Replicator<'ex> {
|
||||||
v4_listener_task: Option<Task<()>>,
|
v4_listener_task: Option<Task<()>>,
|
||||||
v6_listener_task: Option<Task<()>>,
|
v6_listener_task: Option<Task<()>>,
|
||||||
service_task: Task<()>,
|
service_task: Task<()>,
|
||||||
|
_marker: PhantomData<std::cell::UnsafeCell<&'ex ()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Store> Replicator<S> {
|
impl<'ex> Replicator<'ex> {
|
||||||
/// Create a new replicator to replicate the contents of the provided store.
|
/// Create a new replicator to replicate the contents of the provided store.
|
||||||
/// All async tasks, sockets, and connections will be dropped if the replicator is dropped. Use
|
/// All async tasks, sockets, and connections will be dropped if the replicator is dropped.
|
||||||
/// the shutdown() method for a graceful shutdown.
|
pub async fn start(executor: &Arc<Executor<'ex>>, store: Arc<dyn Store>, config: Config) -> Result<Replicator<'ex>, Box<dyn Error>> {
|
||||||
pub async fn start(executor: Arc<Executor>, store: Arc<S>, config: Config) -> Result<Self, Box<dyn Err>> {
|
let _ = unsafe { getrandom(&mut *(&mut XORSHIFT64_STATE as *mut u64).cast::<[u8; 8]>()) };
|
||||||
|
|
||||||
let listener_v4 = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)).await;
|
let listener_v4 = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)).await;
|
||||||
let listener_v6 = TcpListener::bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)).await;
|
let listener_v6 = TcpListener::bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)).await;
|
||||||
if listener_v4.is_err() && listener_v6.is_err() {
|
if listener_v4.is_err() && listener_v6.is_err() {
|
||||||
return Err(Box::new(listener_v4.unwrap_err()));
|
return Err(Box::new(listener_v4.unwrap_err()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let r = Arc::new(ReplicatorImpl::<S> {
|
let r = Arc::new(ReplicatorImpl::<'ex> {
|
||||||
|
executor: executor.clone(),
|
||||||
instance_id: {
|
instance_id: {
|
||||||
let mut tmp = [0_u8; 16];
|
let mut tmp = [0_u8; 16];
|
||||||
getrandom(&mut tmp).expect("getrandom failed");
|
getrandom(&mut tmp).expect("getrandom failed");
|
||||||
|
@ -85,98 +120,67 @@ impl<S: Store> Replicator<S> {
|
||||||
tmp
|
tmp
|
||||||
},
|
},
|
||||||
domain_hash: {
|
domain_hash: {
|
||||||
let mut h = Sha256::new();
|
let mut h = Sha384::new();
|
||||||
h.update(config.domain.as_bytes());
|
h.update(config.domain.as_bytes());
|
||||||
h.finalize_fixed().into()
|
h.finalize().as_ref().try_into().unwrap()
|
||||||
},
|
},
|
||||||
config,
|
config,
|
||||||
store,
|
store,
|
||||||
connections: Mutex::new(HashMap::new()),
|
connections: Mutex::new(HashMap::new()),
|
||||||
|
announced_objects_requested: Mutex::new(HashMap::new())
|
||||||
});
|
});
|
||||||
|
|
||||||
let (e0, e1) = (executor.clone(), executor.clone());
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
state: r,
|
v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.clone().listener_task_main(listener_v4)))),
|
||||||
v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.listener_task_main(listener_v4, e0)))),
|
v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.clone().listener_task_main(listener_v6)))),
|
||||||
v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.listener_task_main(listener_v6, e1)))),
|
service_task: executor.spawn(r.service_main()),
|
||||||
service_task: executor.spawn(r.service_main(executor.clone())),
|
_marker: PhantomData::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(self) {
|
|
||||||
// Get a joined future including our service task and one or both listeners. There is always
|
|
||||||
// at least one listener. If there are no listeners this is a bug and will panic.
|
|
||||||
let main_tasks = self.v4_listener_task.map_or_else(|| {
|
|
||||||
future::zip(self.service_task.cancel(), self.v6_listener_task.unwrap().cancel())
|
|
||||||
}, |v4| {
|
|
||||||
self.v6_listener_task.map_or_else(|| {
|
|
||||||
future::zip(self.service_task.cancel(), v4.cancel())
|
|
||||||
}, |v6| {
|
|
||||||
future::zip(self.service_task.cancel(), future::zip(v4.cancel(), v6.cancel()))
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
// Just dropping all connections is fine.
|
|
||||||
self.state.connections.lock().await.clear();
|
|
||||||
|
|
||||||
// Then gracefully wait for the main tasks to finish.
|
|
||||||
let _ = main_tasks.await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Store> ReplicatorImpl<S> {
|
unsafe impl<'ex> Send for Replicator<'ex> {}
|
||||||
async fn service_main(&self, executor: Arc<Executor>) {
|
|
||||||
let mut timer = smol::Timer::interval(Duration::from_secs(1));
|
unsafe impl<'ex> Sync for Replicator<'ex> {}
|
||||||
let mut to_close: Vec<ConnectionKey> = Vec::new();
|
|
||||||
|
impl<'ex> ReplicatorImpl<'ex> {
|
||||||
|
async fn service_main(self: Arc<ReplicatorImpl<'ex>>) {
|
||||||
|
let mut timer = smol::Timer::interval(Duration::from_secs(5));
|
||||||
loop {
|
loop {
|
||||||
timer.next().await;
|
timer.next().await;
|
||||||
|
|
||||||
let mut connections = self.connections.lock().await;
|
|
||||||
|
|
||||||
let now_mt = ms_monotonic();
|
let now_mt = ms_monotonic();
|
||||||
for cc in connections.iter_mut() {
|
self.announced_objects_requested.lock().await.retain(|_, ts| now_mt.saturating_sub(*ts) < (CONNECTION_TIMEOUT_SECONDS * 1000));
|
||||||
let c = &(*cc.1).0;
|
self.connections.lock().await.retain(|_, c| (now_mt.saturating_sub(c.last_receive.load(Ordering::Relaxed))) < (CONNECTION_TIMEOUT_SECONDS * 1000));
|
||||||
if c.closed.load(Ordering::Relaxed) || (now_mt - c.last_receive_time.load(Ordering::Relaxed)) > CONNECTION_TIMEOUT_SECONDS {
|
|
||||||
to_close.push(cc.0.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for tc in to_close.iter() {
|
|
||||||
let _ = connections.remove(tc);
|
|
||||||
}
|
|
||||||
to_close.clear();
|
|
||||||
|
|
||||||
drop(connections);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn listener_task_main(&self, listener: TcpListener, executor: Arc<Executor>) {
|
async fn listener_task_main(self: Arc<ReplicatorImpl<'ex>>, listener: TcpListener) {
|
||||||
loop {
|
loop {
|
||||||
let stream = listener.accept().await;
|
let stream = listener.accept().await;
|
||||||
if stream.is_ok() {
|
if stream.is_ok() {
|
||||||
let (mut stream, remote_address) = stream.unwrap();
|
let (stream, remote_address) = stream.unwrap();
|
||||||
self.handle_new_connection(stream, remote_address, false, executor.clone()).await;
|
self.handle_new_connection(stream, remote_address, false).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_connection(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc<Executor>) {
|
async fn handle_new_connection(self: &Arc<ReplicatorImpl<'ex>>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) {
|
||||||
stream.set_nodelay(true);
|
stream.set_nodelay(true);
|
||||||
|
|
||||||
let mut loopback_check_code_salt = [0_u8; 8];
|
let mut loopback_check_code_salt = [0_u8; 8];
|
||||||
getrandom(&mut tmp).expect("getrandom failed");
|
getrandom(&mut loopback_check_code_salt).expect("getrandom failed");
|
||||||
|
|
||||||
let mut h = Sha256::new();
|
let mut h = Sha384::new();
|
||||||
h.update(&loopback_check_code_salt);
|
h.update(&loopback_check_code_salt);
|
||||||
h.update(&self.loopback_check_code_secret);
|
h.update(&self.loopback_check_code_secret);
|
||||||
let loopback_check_code: [u8; 32] = h.finalize_fixed().into();
|
let loopback_check_code: [u8; 48] = h.finalize().as_ref().try_into().unwrap();
|
||||||
|
|
||||||
let hello = protocol::Hello {
|
let hello = protocol::Hello {
|
||||||
hello_size: size_of::<protocol::Hello>() as u8,
|
hello_size: size_of::<protocol::Hello>() as u8,
|
||||||
protocol_version: protocol::PROTOCOL_VERSION,
|
protocol_version: protocol::PROTOCOL_VERSION,
|
||||||
flags: [0_u8; 4],
|
flags: [0_u8; 4],
|
||||||
clock: ms_since_epoch().to_le_bytes(),
|
clock: ms_since_epoch().to_le_bytes(),
|
||||||
data_set_size: self.store.total_size().await.to_le_bytes(),
|
data_set_size: self.store.total_size().to_le_bytes(),
|
||||||
domain_hash: self.domain_hash.clone(),
|
domain_hash: self.domain_hash.clone(),
|
||||||
instance_id: self.instance_id.clone(),
|
instance_id: self.instance_id.clone(),
|
||||||
loopback_check_code_salt,
|
loopback_check_code_salt,
|
||||||
|
@ -193,10 +197,10 @@ impl<S: Store> ReplicatorImpl<S> {
|
||||||
if hello.hello_size == size_of::<protocol::Hello>() as u8 && hello.protocol_version == protocol::PROTOCOL_VERSION {
|
if hello.hello_size == size_of::<protocol::Hello>() as u8 && hello.protocol_version == protocol::PROTOCOL_VERSION {
|
||||||
// If this hash's first 16 bytes are equal to the one in the HELLO, this connection is
|
// If this hash's first 16 bytes are equal to the one in the HELLO, this connection is
|
||||||
// from this node and should be dropped.
|
// from this node and should be dropped.
|
||||||
let mut h = Sha256::new();
|
let mut h = Sha384::new();
|
||||||
h.update(&hello.loopback_check_code_salt);
|
h.update(&hello.loopback_check_code_salt);
|
||||||
h.update(&self.loopback_check_code_secret);
|
h.update(&self.loopback_check_code_secret);
|
||||||
let loopback_if_equal: [u8; 32] = h.finalize_fixed().into();
|
let loopback_if_equal: [u8; 48] = h.finalize().as_ref().try_into().unwrap();
|
||||||
|
|
||||||
if !loopback_if_equal[0..16].eq(&hello.loopback_check_code) {
|
if !loopback_if_equal[0..16].eq(&hello.loopback_check_code) {
|
||||||
let k = ConnectionKey {
|
let k = ConnectionKey {
|
||||||
|
@ -206,7 +210,12 @@ impl<S: Store> ReplicatorImpl<S> {
|
||||||
let mut connections = self.connections.lock().await;
|
let mut connections = self.connections.lock().await;
|
||||||
let _ = connections.entry(k).or_insert_with(move || {
|
let _ = connections.entry(k).or_insert_with(move || {
|
||||||
stream.set_nodelay(false);
|
stream.set_nodelay(false);
|
||||||
(remote_address.clone(), executor.spawn(self.connection_io_task_main(stream, remote_address, false, executor.clone())))
|
let last_receive = Arc::new(AtomicU64::new(ms_monotonic()));
|
||||||
|
Connection {
|
||||||
|
remote_address,
|
||||||
|
last_receive: last_receive.clone(),
|
||||||
|
task: self.executor.spawn(self.clone().connection_io_task_main(stream, hello.instance_id, last_receive))
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,60 +223,134 @@ impl<S: Store> ReplicatorImpl<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connection_sync_init_task_main(&self, writer: Arc<Mutex<TcpStream>>) {
|
async fn connection_io_task_main(self: Arc<ReplicatorImpl<'ex>>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc<AtomicU64>) {
|
||||||
let mut periodic_timer = Timer::interval(Duration::from_secs(1));
|
let mut reader = BufReader::with_capacity(65536, stream.clone());
|
||||||
loop {
|
|
||||||
let _ = periodic_timer.next().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn connection_io_task_main(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc<Executor>) {
|
|
||||||
let mut reader = BufReader::with_capacity(S::MAX_OBJECT_SIZE * 2, stream.clone());
|
|
||||||
let writer = Arc::new(Mutex::new(stream));
|
let writer = Arc::new(Mutex::new(stream));
|
||||||
|
|
||||||
let _sync_search_init_task = executor.spawn(self.connection_sync_init_task_main(writer.clone()));
|
let writer2 = writer.clone();
|
||||||
|
let _sync_search_init_task = self.executor.spawn(async {
|
||||||
|
let writer = writer2;
|
||||||
|
let mut periodic_timer = Timer::interval(Duration::from_secs(1));
|
||||||
|
loop {
|
||||||
|
let _ = periodic_timer.next().await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let mut tmp = [0_u8; 4096];
|
let mut tmp_mem = Vec::new();
|
||||||
|
tmp_mem.resize(self.config.max_object_size, 0);
|
||||||
|
let tmp = tmp_mem.as_mut_slice();
|
||||||
'main_io_loop: loop {
|
'main_io_loop: loop {
|
||||||
if reader.read_exact(&mut tmp[0..1]).await.is_err() {
|
if reader.read_exact(&mut tmp[0..1]).await.is_err() {
|
||||||
break 'main_io_loop;
|
break 'main_io_loop;
|
||||||
}
|
}
|
||||||
let message_type = tmp[0];
|
let message_type = tmp[0];
|
||||||
|
|
||||||
if message_type == protocol::MESSAGE_TYPE_NOP {
|
last_receive.store(ms_monotonic(), Ordering::Relaxed);
|
||||||
continue 'main_io_loop;
|
|
||||||
}
|
|
||||||
|
|
||||||
let message_size = varint::async_read(&mut reader).await;
|
|
||||||
if message_size.is_err() {
|
|
||||||
break 'main_io_loop;
|
|
||||||
}
|
|
||||||
let mut message_size = message_size.unwrap();
|
|
||||||
|
|
||||||
if message_size > S::MAX_OBJECT_SIZE as u64 {
|
|
||||||
break 'main_io_loop;
|
|
||||||
}
|
|
||||||
|
|
||||||
match message_type {
|
match message_type {
|
||||||
|
protocol::MESSAGE_TYPE_NOP => {},
|
||||||
|
|
||||||
|
protocol::MESSAGE_TYPE_HAVE_NEW_OBJECT => {
|
||||||
|
if reader.read_exact(&mut tmp[0..IDENTITY_HASH_SIZE]).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
let identity_hash: [u8; 48] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap();
|
||||||
|
let mut announced_objects_requested = self.announced_objects_requested.lock().await;
|
||||||
|
if !announced_objects_requested.contains_key(&identity_hash) && !self.store.have(&identity_hash) {
|
||||||
|
announced_objects_requested.insert(identity_hash.clone(), ms_monotonic());
|
||||||
|
drop(announced_objects_requested); // release mutex
|
||||||
|
|
||||||
|
tmp[0] = protocol::MESSAGE_TYPE_GET_OBJECTS;
|
||||||
|
tmp[1] = 0;
|
||||||
|
tmp[2] = varint::ONE;
|
||||||
|
tmp[3..(3 + IDENTITY_HASH_SIZE)].copy_from_slice(&identity_hash);
|
||||||
|
if !writer.lock().await.write_all(&tmp).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protocol::MESSAGE_TYPE_OBJECT => {
|
protocol::MESSAGE_TYPE_OBJECT => {
|
||||||
|
let object_size = varint::async_read(&mut reader).await;
|
||||||
|
if object_size.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
let object_size = object_size.unwrap();
|
||||||
|
if object_size > self.config.max_object_size as u64 {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
let object = &mut tmp[0..(object_size as usize)];
|
||||||
|
if reader.read_exact(object).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
let identity_hash: [u8; 48] = Sha384::digest(object.as_ref()).as_ref().try_into().unwrap();
|
||||||
|
match self.store.put(&identity_hash, object) {
|
||||||
|
StoreObjectResult::Invalid => {
|
||||||
|
break 'main_io_loop;
|
||||||
|
},
|
||||||
|
StoreObjectResult::Ok | StoreObjectResult::Duplicate => {
|
||||||
|
if self.announced_objects_requested.lock().await.remove(&identity_hash).is_some() {
|
||||||
|
// TODO: propagate rumor if we requested this object in response to a HAVE message.
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
let _ = self.announced_objects_requested.lock().await.remove(&identity_hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
protocol::MESSAGE_TYPE_GET_OBJECTS => {
|
protocol::MESSAGE_TYPE_GET_OBJECTS => {
|
||||||
|
// Read common prefix if the requester is requesting a set of hashes with the same beginning.
|
||||||
|
// A common prefix length of zero means they're requesting by full hash.
|
||||||
|
if reader.read_exact(&mut tmp[0..1]).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
let common_prefix_length = tmp[0] as usize;
|
||||||
|
if common_prefix_length >= IDENTITY_HASH_SIZE {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
if reader.read_exact(&mut tmp[0..common_prefix_length]).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the number of hashes being requested.
|
||||||
|
let hash_count = varint::async_read(&mut reader).await;
|
||||||
|
if hash_count.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
let hash_count = hash_count.unwrap();
|
||||||
|
|
||||||
|
// Step through each suffix of the common prefix and send the object if found.
|
||||||
|
for _ in 0..hash_count {
|
||||||
|
if reader.read_exact(&mut tmp[common_prefix_length..IDENTITY_HASH_SIZE]).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
let identity_hash: [u8; IDENTITY_HASH_SIZE] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap();
|
||||||
|
let object = self.store.get(&identity_hash);
|
||||||
|
if object.is_some() {
|
||||||
|
let object2 = object.unwrap();
|
||||||
|
let object = object2.as_slice();
|
||||||
|
let mut w = writer.lock().await;
|
||||||
|
if varint::async_write(&mut *w, object.len() as u64).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
if w.write_all(object).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
_ => {
|
_ => {
|
||||||
// Skip the bodies of unrecognized message types.
|
break 'main_io_loop;
|
||||||
while message_size >= tmp.len() as u64 {
|
|
||||||
if reader.read_exact(&tmp).await.is_err() {
|
|
||||||
break 'main_io_loop;
|
|
||||||
}
|
|
||||||
message_size -= tmp.len() as u64;
|
|
||||||
}
|
|
||||||
if message_size > 0 {
|
|
||||||
if reader.read_exact(&mut tmp[0..(message_size as usize)]).await.is_err() {
|
|
||||||
break 'main_io_loop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsafe impl<'ex> Send for ReplicatorImpl<'ex> {}
|
||||||
|
|
||||||
|
unsafe impl<'ex> Sync for ReplicatorImpl<'ex> {}
|
||||||
|
|
|
@ -1,68 +1,50 @@
|
||||||
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* (c)2021 ZeroTier, Inc.
|
||||||
|
* https://www.zerotier.com/
|
||||||
|
*/
|
||||||
|
|
||||||
use smol::net::SocketAddr;
|
use smol::net::SocketAddr;
|
||||||
|
|
||||||
|
use crate::IDENTITY_HASH_SIZE;
|
||||||
|
|
||||||
/// Result code from the put() method in Database.
|
/// Result code from the put() method in Database.
|
||||||
pub enum PutObjectResult {
|
pub enum StoreObjectResult {
|
||||||
/// Datum stored successfully.
|
/// Datum stored successfully.
|
||||||
Ok,
|
Ok,
|
||||||
/// Datum is one we already have.
|
/// Datum is one we already have.
|
||||||
Duplicate,
|
Duplicate,
|
||||||
/// Value is invalid. (this may result in dropping connections to peers, etc.)
|
/// Value is invalid. (this may result in dropping connections to peers, etc.)
|
||||||
Invalid,
|
Invalid,
|
||||||
|
/// Value is not invalid but it was not added to the data store for some neutral reason.
|
||||||
|
Ignored,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait that must be implemented for the data store that is to be replicated.
|
/// Trait that must be implemented for the data store that is to be replicated.
|
||||||
///
|
|
||||||
/// Each datum is identified by an identity hash, which is a cryptographic hash of HASH_SIZE
|
|
||||||
/// bytes of its value. The implementation assumes that's what it is, but the hash function
|
|
||||||
/// is not specified in this library.
|
|
||||||
pub trait Store: Sync + Send {
|
pub trait Store: Sync + Send {
|
||||||
/// The size in bytes of the identity hash.
|
|
||||||
const HASH_SIZE: usize;
|
|
||||||
|
|
||||||
/// The maximum size of the objects supported by this store (and thus replication domain).
|
|
||||||
const MAX_OBJECT_SIZE: usize;
|
|
||||||
|
|
||||||
/// Object type returned by get(), must implement AsRef<[u8]>.
|
|
||||||
type GetOutput: AsRef<[u8]>;
|
|
||||||
|
|
||||||
/// Compute a hash of a data object using the hash associated with this store.
|
|
||||||
/// This returns the identity hash which can then be used as a key with get(), put(), etc.
|
|
||||||
fn hash(&self, object: &[u8]) -> [u8; Self::HASH_SIZE];
|
|
||||||
|
|
||||||
/// Get the total size of this data set in objects.
|
/// Get the total size of this data set in objects.
|
||||||
async fn total_size(&self) -> u64;
|
fn total_size(&self) -> u64;
|
||||||
|
|
||||||
/// Get an object from the database, returning None if it is not found or there is an error.
|
/// Get an object from the database.
|
||||||
async fn get(&self, identity_hash: &[u8; Self::HASH_SIZE]) -> Option<Self::GetOutput>;
|
fn get(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option<Vec<u8>>;
|
||||||
|
|
||||||
/// Store an entry in the database.
|
/// Store an entry in the database.
|
||||||
async fn put(&self, identity_hash: &[u8; Self::HASH_SIZE], object: &[u8]) -> PutObjectResult;
|
fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StoreObjectResult;
|
||||||
|
|
||||||
|
/// Check if we have an object by its identity hash.
|
||||||
|
fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool;
|
||||||
|
|
||||||
/// Count the number of identity hash keys in this range (inclusive) of identity hashes.
|
/// Count the number of identity hash keys in this range (inclusive) of identity hashes.
|
||||||
/// This may return None if an error occurs, but should return 0 if the set is empty.
|
/// This may return None if an error occurs, but should return 0 if the set is empty.
|
||||||
async fn count(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE]) -> Option<u64>;
|
fn count(&self, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option<u64>;
|
||||||
|
|
||||||
/// Called when a connection to a remote node was successful.
|
/// Called when a connection to a remote node was successful.
|
||||||
/// This is always called on successful outbound connect.
|
/// This is always called on successful outbound connect.
|
||||||
async fn save_remote_endpoint(&self, to_address: &SocketAddr);
|
fn save_remote_endpoint(&self, to_address: &SocketAddr);
|
||||||
|
|
||||||
/// Get a remote endpoint to try.
|
/// Get a remote endpoint to try.
|
||||||
/// This can return endpoints in any order and is used to try to establish outbound links.
|
/// This can return endpoints in any order and is used to try to establish outbound links.
|
||||||
async fn get_remote_endpoint(&self) -> Option<SocketAddr>;
|
fn get_remote_endpoint(&self) -> Option<SocketAddr>;
|
||||||
|
|
||||||
/*
|
|
||||||
/// Execute a function for every hash/value in a range.
|
|
||||||
/// Iteration stops if the supplied function returns false.
|
|
||||||
async fn for_each_entry<F, FF>(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F)
|
|
||||||
where
|
|
||||||
F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF,
|
|
||||||
FF: Future<Output=bool>;
|
|
||||||
|
|
||||||
/// Execute a function for every hash in a range.
|
|
||||||
/// Iteration stops if the supplied function returns false.
|
|
||||||
async fn for_each_hash_key<F, FF>(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F)
|
|
||||||
where
|
|
||||||
F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF,
|
|
||||||
FF: Future<Output=bool>;
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,13 @@
|
||||||
|
|
||||||
use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt};
|
use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt};
|
||||||
|
|
||||||
pub async fn async_write<W: AsyncWrite>(w: &mut W, mut v: u64) -> std::io::Result<()> {
|
/// Byte that can be written for a zero varint.
|
||||||
|
pub const ZERO: u8 = 0x80;
|
||||||
|
|
||||||
|
/// Byte that can be written for a varint of 1.
|
||||||
|
pub const ONE: u8 = 0x81;
|
||||||
|
|
||||||
|
pub async fn async_write<W: AsyncWrite + Unpin>(w: &mut W, mut v: u64) -> std::io::Result<()> {
|
||||||
let mut b = [0_u8; 10];
|
let mut b = [0_u8; 10];
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
loop {
|
loop {
|
||||||
|
@ -25,7 +31,7 @@ pub async fn async_write<W: AsyncWrite>(w: &mut W, mut v: u64) -> std::io::Resul
|
||||||
w.write_all(&b[0..i]).await
|
w.write_all(&b[0..i]).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn async_read<R: AsyncRead>(r: &mut R) -> std::io::Result<u64> {
|
pub async fn async_read<R: AsyncRead + Unpin>(r: &mut R) -> std::io::Result<u64> {
|
||||||
let mut v = 0_u64;
|
let mut v = 0_u64;
|
||||||
let mut buf = [0_u8; 1];
|
let mut buf = [0_u8; 1];
|
||||||
let mut pos = 0;
|
let mut pos = 0;
|
||||||
|
|
Loading…
Add table
Reference in a new issue