mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-04-19 13:36:54 +02:00
cleanup
This commit is contained in:
parent
35fa7a513d
commit
4bbcca37b8
10 changed files with 0 additions and 1567 deletions
|
@ -1,37 +0,0 @@
|
|||
[package]
|
||||
name = "syncwhole"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "MPL-2.0"
|
||||
authors = ["Adam Ierymenko <adam.ierymenko@zerotier.com>"]
|
||||
|
||||
[profile.release]
|
||||
opt-level = 3
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
panic = 'abort'
|
||||
|
||||
[lib]
|
||||
name = "syncwhole"
|
||||
path = "src/lib.rs"
|
||||
doc = true
|
||||
|
||||
[[bin]]
|
||||
name = "syncwhole_local_testnet"
|
||||
path = "src/main.rs"
|
||||
doc = false
|
||||
required-features = ["include_sha2_lib"]
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "^1", features = ["net", "rt", "parking_lot", "time", "io-std", "io-util", "sync", "rt-multi-thread"], default-features = false }
|
||||
serde = { version = "^1", features = ["derive"], default-features = false }
|
||||
serde_bytes = "^0"
|
||||
rmp = "^0"
|
||||
rmp-serde = "^1"
|
||||
sha2 = { version = "^0", optional = true }
|
||||
async-trait = "^0"
|
||||
futures-core = "^0"
|
||||
iblt = { version = "^0", path = "../iblt" }
|
||||
|
||||
[features]
|
||||
include_sha2_lib = ["sha2"]
|
|
@ -1 +0,0 @@
|
|||
../rustfmt.toml
|
|
@ -1,105 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
/// Size of keys, which is the size of a 512-bit hash. This is a protocol constant.
|
||||
pub const KEY_SIZE: usize = 64;
|
||||
|
||||
/// Result returned by DataStore::store().
|
||||
pub enum StoreResult {
|
||||
/// Entry was accepted.
|
||||
Ok,
|
||||
|
||||
/// Entry was a duplicate of one we already have but was otherwise valid.
|
||||
Duplicate,
|
||||
|
||||
/// Entry was valid but was ignored for an unspecified reason.
|
||||
Ignored,
|
||||
|
||||
/// Entry was rejected as malformed or otherwise invalid (e.g. failed signature check).
|
||||
Rejected,
|
||||
}
|
||||
|
||||
/// API to be implemented by the data set we want to replicate.
|
||||
///
|
||||
/// Keys as used in this API are SHA512 hashes of values.
|
||||
///
|
||||
/// Range queries take an optional subset parameter. The format and interpretation of
|
||||
/// this is entirely up to the implementer of DataStore. It could contain a time, a SQL
|
||||
/// query, a set of certificates, anything. Its purpose is to select which items we want
|
||||
/// from remote nodes so that we can replicate only a subset of a larger set of data.
|
||||
/// Other nodes can also supply a subset to this one, so it's important that remote subset
|
||||
/// values supplied to the local data store be handled correctly.
|
||||
#[async_trait]
|
||||
pub trait DataStore: Sync + Send {
|
||||
/// Container for values returned by load().
|
||||
///
|
||||
/// Making this a trait defined type lets you use Arc<[u8]>, etc. as well as obvious
|
||||
/// ones like Box<[u8]> and Vec<u8>.
|
||||
type ValueRef: AsRef<[u8]> + Sync + Send + Clone;
|
||||
|
||||
/// Key hash size, always 64 for SHA512.
|
||||
const KEY_SIZE: usize = KEY_SIZE;
|
||||
|
||||
/// Maximum size of a value in bytes.
|
||||
const MAX_VALUE_SIZE: usize;
|
||||
|
||||
/// Get the subset that should be sent to remote nodes in queries.
|
||||
async fn local_subset(&self) -> Option<Self::ValueRef>;
|
||||
|
||||
/// Get an item by identity hash key if it exists.
|
||||
async fn load(&self, key: &[u8; KEY_SIZE]) -> Option<Self::ValueRef>;
|
||||
|
||||
/// Store an item in the data store and return its status.
|
||||
///
|
||||
/// Note that no time is supplied here. The data store must determine this in an implementation
|
||||
/// dependent manner if this is a temporally subjective data store. It could be determined by
|
||||
/// the wall clock, from the object itself, etc.
|
||||
///
|
||||
/// The key supplied here will always be the SHA512 hash of the value. There is no need to
|
||||
/// re-compute and check the key, but the value must be validated.
|
||||
///
|
||||
/// Validation of the value and returning the appropriate StoreResult is important to the
|
||||
/// operation of the synchronization algorithm:
|
||||
///
|
||||
/// StoreResult::Ok - Value was valid and was accepted and saved.
|
||||
///
|
||||
/// StoreResult::Duplicate - Value was valid but is a duplicate of one we already have.
|
||||
///
|
||||
/// StoreResult::Ignored - Value was valid but for some other reason was not saved.
|
||||
///
|
||||
/// StoreResult::Rejected - Value was not valid, causes link to peer to be dropped.
|
||||
///
|
||||
/// Rejected should only be returned if the value actually fails a validity check, signature
|
||||
/// verification, proof of work check, or some other required criteria. Ignored must be
|
||||
/// returned if the value is valid but is too old or was rejected for some other normal reason.
|
||||
async fn store(&self, key: &[u8; KEY_SIZE], value: &[u8]) -> StoreResult;
|
||||
|
||||
/// Iterate through keys in a range.
|
||||
///
|
||||
/// Keys MUST be output in ascending binary sort order.
|
||||
async fn keys<F: Send + FnMut(&[u8]) -> bool>(
|
||||
&self,
|
||||
subset: Option<&[u8]>,
|
||||
range_start: &[u8; KEY_SIZE],
|
||||
range_end: &[u8; KEY_SIZE],
|
||||
f: F,
|
||||
);
|
||||
|
||||
/// Iterate through values in a range.
|
||||
///
|
||||
/// Entries MUST be output in ascending binary sort order.
|
||||
async fn values<F: Send + FnMut(&[u8], &[u8]) -> bool>(
|
||||
&self,
|
||||
subset: Option<&[u8]>,
|
||||
range_start: &[u8; KEY_SIZE],
|
||||
range_end: &[u8; KEY_SIZE],
|
||||
f: F,
|
||||
);
|
||||
}
|
|
@ -1,156 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(feature = "include_sha2_lib")]
|
||||
use sha2::digest::{Digest, FixedOutput};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::node::RemoteNodeInfo;
|
||||
|
||||
/// Configuration setttings for a syncwhole node.
|
||||
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)]
|
||||
pub struct Config {
|
||||
/// A list of peer addresses to which we always want to stay connected.
|
||||
/// The library will try to maintain connectivity to these regardless of connection limits.
|
||||
pub anchors: Vec<SocketAddr>,
|
||||
|
||||
/// A list of other peer addresses that we can try in order to achieve desired_connection_count.
|
||||
/// If this includes the anchors too there will be no effect since the anchors are tried first anyway.
|
||||
pub peers: Vec<SocketAddr>,
|
||||
|
||||
/// The maximum number of TCP connections we should allow.
|
||||
pub max_connection_count: usize,
|
||||
|
||||
/// The desired number of peering links.
|
||||
pub desired_connection_count: usize,
|
||||
|
||||
/// Synchronization interval in milliseconds.
|
||||
pub sync_interval: u64,
|
||||
|
||||
/// Connection inactivity timeout in milliseconds.
|
||||
pub connection_timeout: u64,
|
||||
|
||||
/// An arbitrary name for this data set to avoid connecting to irrelevant nodes.
|
||||
pub domain: String,
|
||||
|
||||
/// An optional name for this node to advertise to other nodes.
|
||||
pub name: String,
|
||||
|
||||
/// An optional contact string for this node to advertise to other nodes.
|
||||
/// Example: bighead@stanford.edu or https://www.piedpiper.com/
|
||||
pub contact: String,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
anchors: Vec::new(),
|
||||
peers: Vec::new(),
|
||||
max_connection_count: 128,
|
||||
desired_connection_count: 64,
|
||||
sync_interval: 500,
|
||||
connection_timeout: 500 * 10,
|
||||
domain: String::new(),
|
||||
name: String::new(),
|
||||
contact: String::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait that users of syncwhole implement to provide configuration information and listen for events.
|
||||
pub trait Host: Sync + Send {
|
||||
/// Get the current configuration for this node.
|
||||
fn node_config(&self) -> Arc<Config>;
|
||||
|
||||
/// Test whether an inbound connection should be allowed from an address.
|
||||
///
|
||||
/// This is called on first incoming connection before any init is received. The authenticate()
|
||||
/// method is called once init has been received and is another decision point. The default
|
||||
/// implementation of this always returns true.
|
||||
///
|
||||
/// This is not called for outbound connections.
|
||||
#[allow(unused_variables)]
|
||||
fn allow(&self, remote_address: &SocketAddr) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Compute HMAC-SHA512(secret, challenge).
|
||||
///
|
||||
/// A return of None indicates that the connection should be dropped. If authentication is
|
||||
/// not enabled, the response should be computed using an all-zero secret key. This is
|
||||
/// what the default implementation does, so if you don't want authentication there is no
|
||||
/// need to override and implement this.
|
||||
///
|
||||
/// This actually gets called twice per link: once when Init is received to compute the
|
||||
/// response, and once when InitResponse is received to verify the response to our challenge.
|
||||
///
|
||||
/// The default implementation authenticates with an all-zero key. Leave it this way if
|
||||
/// you don't want authentication.
|
||||
#[allow(unused_variables)]
|
||||
fn authenticate(&self, info: &RemoteNodeInfo, challenge: &[u8]) -> Option<[u8; 64]> {
|
||||
Some(Self::hmac_sha512(&[0_u8; 64], challenge))
|
||||
}
|
||||
|
||||
/// Called when an attempt is made to connect to a remote address.
|
||||
fn on_connect_attempt(&self, address: &SocketAddr);
|
||||
|
||||
/// Called when a connection has been successfully established.
|
||||
///
|
||||
/// Hosts are encouraged to learn endpoints when a successful outbound connection is made. Check the
|
||||
/// inbound flag in the remote node info structure.
|
||||
fn on_connect(&self, info: &RemoteNodeInfo);
|
||||
|
||||
/// Called when an open connection is closed for any reason.
|
||||
fn on_connection_closed(&self, info: &RemoteNodeInfo, reason: String);
|
||||
|
||||
/// Fill a buffer with secure random bytes.
|
||||
///
|
||||
/// The implementer must call a secure random number generator or source to implement this.
|
||||
fn get_secure_random(&self, buf: &mut [u8]);
|
||||
|
||||
/// Compute a SHA512 digest of the input.
|
||||
///
|
||||
/// Input can consist of one or more slices that will be processed in order.
|
||||
///
|
||||
/// If the feature "include_sha2_lib" is enabled a default implementation in terms of the
|
||||
/// Rust sha2 crate is generated. Otherwise the implementer must supply their own
|
||||
/// SHA512 function.
|
||||
#[cfg(not(feature = "include_sha2_lib"))]
|
||||
fn sha512(msg: &[&[u8]]) -> [u8; 64];
|
||||
#[cfg(feature = "include_sha2_lib")]
|
||||
fn sha512(msg: &[&[u8]]) -> [u8; 64] {
|
||||
let mut h = sha2::Sha512::new();
|
||||
for b in msg.iter() {
|
||||
h.update(*b);
|
||||
}
|
||||
h.finalize_fixed().try_into().unwrap()
|
||||
}
|
||||
|
||||
/// Compute HMAC-SHA512 using key and input.
|
||||
///
|
||||
/// Supplied key will always be 64 bytes in length.
|
||||
///
|
||||
/// The default implementation is HMAC implemented in terms of sha512() above. Specialize
|
||||
/// to provide your own implementation.
|
||||
fn hmac_sha512(key: &[u8], msg: &[u8]) -> [u8; 64] {
|
||||
let mut opad = [0x5c_u8; 128];
|
||||
let mut ipad = [0x36_u8; 128];
|
||||
assert!(key.len() >= 64);
|
||||
for i in 0..64 {
|
||||
opad[i] ^= key[i];
|
||||
}
|
||||
for i in 0..64 {
|
||||
ipad[i] ^= key[i];
|
||||
}
|
||||
Self::sha512(&[&opad, &Self::sha512(&[&ipad, msg])])
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
pub(crate) mod protocol;
|
||||
pub(crate) mod utils;
|
||||
pub(crate) mod varint;
|
||||
|
||||
pub mod datastore;
|
||||
pub mod host;
|
||||
pub mod node;
|
||||
|
||||
pub use async_trait;
|
|
@ -1,196 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::{stdout, Write};
|
||||
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||
use std::ops::Bound::Included;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use sha2::digest::Digest;
|
||||
use sha2::Sha512;
|
||||
|
||||
use syncwhole::datastore::*;
|
||||
use syncwhole::host::*;
|
||||
use syncwhole::node::*;
|
||||
use syncwhole::utils::*;
|
||||
|
||||
const TEST_NODE_COUNT: usize = 8;
|
||||
const TEST_PORT_RANGE_START: u16 = 21384;
|
||||
const TEST_STARTING_RECORDS_PER_NODE: usize = 16;
|
||||
|
||||
static mut RANDOM_CTR: u128 = 0;
|
||||
|
||||
fn get_random_bytes(mut buf: &mut [u8]) {
|
||||
// This is only for testing and is not really secure.
|
||||
let mut ctr = unsafe { RANDOM_CTR };
|
||||
if ctr == 0 {
|
||||
ctr = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() * (1 + Instant::now().elapsed().as_nanos());
|
||||
}
|
||||
while !buf.is_empty() {
|
||||
let l = buf.len().min(64);
|
||||
ctr = ctr.wrapping_add(1);
|
||||
buf[..l].copy_from_slice(&Sha512::digest(&ctr.to_ne_bytes()).as_slice()[..l]);
|
||||
buf = &mut buf[l..];
|
||||
}
|
||||
unsafe { RANDOM_CTR = ctr };
|
||||
}
|
||||
|
||||
pub struct TestNodeHost {
|
||||
pub name: String,
|
||||
pub config: Config,
|
||||
pub records: tokio::sync::RwLock<BTreeMap<[u8; 64], [u8; 64]>>,
|
||||
}
|
||||
|
||||
impl TestNodeHost {
|
||||
pub fn new_random(test_no: usize) -> Self {
|
||||
let mut s = BTreeMap::new();
|
||||
for _ in 0..TEST_STARTING_RECORDS_PER_NODE {
|
||||
let mut v = [0_u8; 64];
|
||||
get_random_bytes(&mut v);
|
||||
let k = Self::sha512(&[&v]);
|
||||
s.insert(k, v);
|
||||
}
|
||||
Self {
|
||||
name: test_no.to_string(),
|
||||
config: Config::default(),
|
||||
records: tokio::sync::RwLock::new(s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Host for TestNodeHost {
|
||||
fn node_config(&self) -> Config {
|
||||
self.config.clone()
|
||||
}
|
||||
|
||||
fn on_connect_attempt(&self, _address: &SocketAddr) {
|
||||
//println!("{:5}: connecting to {}", self.name, _address.to_string());
|
||||
}
|
||||
|
||||
fn on_connect(&self, info: &RemoteNodeInfo) {
|
||||
//println!("{:5}: connected to {} ({}, {})", self.name, info.remote_address.to_string(), info.node_name.as_ref().map_or("null", |s| s.as_str()), if info.inbound { "inbound" } else { "outbound" });
|
||||
}
|
||||
|
||||
fn on_connection_closed(&self, info: &RemoteNodeInfo, reason: String) {
|
||||
//println!("{:5}: closed connection to {}: {} ({}, {})", self.name, info.remote_address.to_string(), reason, if info.inbound { "inbound" } else { "outbound" }, if info.initialized { "initialized" } else { "not initialized" });
|
||||
}
|
||||
|
||||
fn get_secure_random(&self, buf: &mut [u8]) {
|
||||
// This is only for testing and is not really secure.
|
||||
get_random_bytes(buf);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DataStore for TestNodeHost {
|
||||
type ValueRef = [u8; 64];
|
||||
|
||||
const MAX_VALUE_SIZE: usize = 1024;
|
||||
|
||||
fn clock(&self) -> i64 {
|
||||
ms_since_epoch()
|
||||
}
|
||||
|
||||
fn domain(&self) -> &str {
|
||||
"test"
|
||||
}
|
||||
|
||||
async fn load(&self, key: &[u8; 64]) -> Option<Self::ValueRef> {
|
||||
let records = self.records.read().await;
|
||||
let value = records.get(key);
|
||||
if value.is_some() {
|
||||
Some(value.unwrap().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn store(&self, key: &[u8; 64], value: &[u8]) -> StoreResult {
|
||||
let value: [u8; 64] = value.try_into();
|
||||
if value.is_ok() {
|
||||
if self.records.write().await.insert(key.clone(), value).is_none() {
|
||||
StoreResult::Ok
|
||||
} else {
|
||||
StoreResult::Duplicate
|
||||
}
|
||||
} else {
|
||||
StoreResult::Rejected
|
||||
}
|
||||
}
|
||||
|
||||
async fn keys_under<F: Send + FnMut(&[u8]) -> bool>(&self, reference_time: i64, prefix: u64, prefix_bits: u32, f: F) {
|
||||
let (start, end) = prefix_to_range(prefix, prefix_bits);
|
||||
let records = self.records.read().await;
|
||||
for (k, v) in records.range((Included(start), Included(end))) {
|
||||
if !f(k) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async {
|
||||
println!(
|
||||
"Running syncwhole local self-test network with {} nodes starting at 127.0.0.1:{}",
|
||||
TEST_NODE_COUNT, TEST_PORT_RANGE_START
|
||||
);
|
||||
println!();
|
||||
|
||||
println!("Starting nodes on 127.0.0.1...");
|
||||
let mut nodes: Vec<Node<TestNodeHost, TestNodeHost>> = Vec::with_capacity(TEST_NODE_COUNT);
|
||||
for port in TEST_PORT_RANGE_START..(TEST_PORT_RANGE_START + (TEST_NODE_COUNT as u16)) {
|
||||
let mut peers: Vec<SocketAddr> = Vec::with_capacity(TEST_NODE_COUNT);
|
||||
for port2 in TEST_PORT_RANGE_START..(TEST_PORT_RANGE_START + (TEST_NODE_COUNT as u16)) {
|
||||
if port != port2 {
|
||||
peers.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port2)));
|
||||
}
|
||||
}
|
||||
let mut th = TestNodeHost::new_random(port as usize);
|
||||
th.config.anchors = peers;
|
||||
th.config.name = port.to_string();
|
||||
let nh = Arc::new(th);
|
||||
//println!("Starting node on 127.0.0.1:{}...", port, nh.db.lock().unwrap().len());
|
||||
nodes.push(
|
||||
Node::new(nh.clone(), nh.clone(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)))
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
print!("Waiting for all connections to be established...");
|
||||
let _ = stdout().flush();
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
let mut count = 0;
|
||||
for n in nodes.iter() {
|
||||
count += n.connection_count().await;
|
||||
}
|
||||
if count == (TEST_NODE_COUNT * (TEST_NODE_COUNT - 1)) {
|
||||
println!(" {} connections up.", count);
|
||||
break;
|
||||
} else {
|
||||
print!(".");
|
||||
let _ = stdout().flush();
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
});
|
||||
}
|
|
@ -1,701 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::IoSlice;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
|
||||
use std::ops::Add;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use iblt::IBLT;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
||||
use tokio::net::{TcpListener, TcpSocket, TcpStream};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{Duration, Instant};
|
||||
|
||||
use crate::datastore::*;
|
||||
use crate::host::Host;
|
||||
use crate::protocol::*;
|
||||
use crate::utils::*;
|
||||
use crate::varint;
|
||||
|
||||
// Interval for announcing queued HaveRecords items, in milliseconds.
|
||||
const ANNOUNCE_PERIOD: i64 = 100;
|
||||
|
||||
/// Information about a remote node to which we are connected.
|
||||
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RemoteNodeInfo {
|
||||
/// Optional name advertised by remote node (arbitrary).
|
||||
pub name: String,
|
||||
|
||||
/// Optional contact information advertised by remote node (arbitrary).
|
||||
pub contact: String,
|
||||
|
||||
/// Actual remote endpoint address.
|
||||
pub remote_address: SocketAddr,
|
||||
|
||||
/// Explicitly advertised remote addresses supplied by remote node (not necessarily verified).
|
||||
pub explicit_addresses: Vec<SocketAddr>,
|
||||
|
||||
/// Time TCP connection was established (ms since epoch).
|
||||
pub connect_time: i64,
|
||||
|
||||
/// Time TCP connection was estaablished (ms, monotonic).
|
||||
pub connect_instant: i64,
|
||||
|
||||
/// True if this is an inbound TCP connection.
|
||||
pub inbound: bool,
|
||||
|
||||
/// True if this connection has exchanged init messages successfully.
|
||||
pub initialized: bool,
|
||||
}
|
||||
|
||||
/// An instance of the syncwhole data set synchronization engine.
|
||||
///
|
||||
/// This holds a number of async tasks that are terminated or aborted if this object
|
||||
/// is dropped. In other words this implements structured concurrency.
|
||||
pub struct Node<D: DataStore + 'static, H: Host + 'static> {
|
||||
internal: Arc<NodeInternal<D, H>>,
|
||||
housekeeping_task: JoinHandle<()>,
|
||||
announce_task: JoinHandle<()>,
|
||||
listener_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl<D: DataStore + 'static, H: Host + 'static> Node<D, H> {
|
||||
pub async fn new(db: Arc<D>, host: Arc<H>, bind_address: SocketAddr) -> std::io::Result<Self> {
|
||||
let listener = if bind_address.is_ipv4() {
|
||||
TcpSocket::new_v4()
|
||||
} else {
|
||||
TcpSocket::new_v6()
|
||||
}?;
|
||||
configure_tcp_socket(&listener)?;
|
||||
listener.bind(bind_address.clone())?;
|
||||
let listener = listener.listen(1024)?;
|
||||
|
||||
let internal = Arc::new(NodeInternal::<D, H> {
|
||||
anti_loopback_secret: {
|
||||
let mut tmp = [0_u8; 64];
|
||||
host.get_secure_random(&mut tmp);
|
||||
tmp
|
||||
},
|
||||
datastore: db.clone(),
|
||||
host: host.clone(),
|
||||
connections: Mutex::new(HashMap::new()),
|
||||
connecting_to: Mutex::new(HashSet::new()),
|
||||
announce_queue: Mutex::new(HashMap::new()),
|
||||
bind_address,
|
||||
starting_instant: Instant::now(),
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
internal: internal.clone(),
|
||||
housekeeping_task: tokio::spawn(internal.clone().housekeeping_task_main()),
|
||||
announce_task: tokio::spawn(internal.clone().announce_task_main()),
|
||||
listener_task: tokio::spawn(internal.listener_task_main(listener)),
|
||||
})
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn datastore(&self) -> &Arc<D> {
|
||||
&self.internal.datastore
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn host(&self) -> &Arc<H> {
|
||||
&self.internal.host
|
||||
}
|
||||
|
||||
/// Attempt to connect to an explicitly specified TCP endpoint.
|
||||
///
|
||||
/// Ok(true) is returned if a new connection was made. Ok(false) means there is already a connection
|
||||
/// to the endpoint. An error is returned if the connection fails.
|
||||
pub async fn connect(&self, address: &SocketAddr) -> std::io::Result<bool> {
|
||||
if self.internal.connecting_to.lock().await.insert(address.clone()) {
|
||||
self.internal
|
||||
.connect(
|
||||
address,
|
||||
Instant::now().add(Duration::from_millis(self.internal.host.node_config().connection_timeout)),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a list of all open peer to peer connections.
|
||||
pub async fn list_connections(&self) -> Vec<RemoteNodeInfo> {
|
||||
let connections = self.internal.connections.lock().await;
|
||||
let mut cl: Vec<RemoteNodeInfo> = Vec::with_capacity(connections.len());
|
||||
for (_, c) in connections.iter() {
|
||||
cl.push(c.info.lock().unwrap().clone());
|
||||
}
|
||||
cl
|
||||
}
|
||||
|
||||
/// Get the number of open peer to peer connections.
|
||||
pub async fn connection_count(&self) -> usize {
|
||||
self.internal.connections.lock().await.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: DataStore + 'static, H: Host + 'static> Drop for Node<D, H> {
|
||||
fn drop(&mut self) {
|
||||
self.housekeeping_task.abort();
|
||||
self.announce_task.abort();
|
||||
self.listener_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/********************************************************************************************************************/
|
||||
|
||||
fn configure_tcp_socket(socket: &TcpSocket) -> std::io::Result<()> {
|
||||
let _ = socket.set_linger(None);
|
||||
if socket.set_reuseport(true).is_ok() {
|
||||
Ok(())
|
||||
} else {
|
||||
socket.set_reuseaddr(true)
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_msgpack<'a, T: Deserialize<'a>>(b: &'a [u8]) -> std::io::Result<T> {
|
||||
rmp_serde::from_slice(b).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("invalid msgpack object: {}", e.to_string()),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub struct NodeInternal<D: DataStore + 'static, H: Host + 'static> {
|
||||
// Secret used to perform HMAC to detect and drop loopback connections to self.
|
||||
anti_loopback_secret: [u8; 64],
|
||||
|
||||
// Outside code implementations of DataStore and Host traits.
|
||||
datastore: Arc<D>,
|
||||
host: Arc<H>,
|
||||
|
||||
// Connections and their task join handles, by remote endpoint address.
|
||||
connections: Mutex<HashMap<SocketAddr, Arc<Connection>>>,
|
||||
|
||||
// Outgoing connections in progress.
|
||||
connecting_to: Mutex<HashSet<SocketAddr>>,
|
||||
|
||||
// Records received since last announce and the endpoints that we know already have them.
|
||||
announce_queue: Mutex<HashMap<[u8; KEY_SIZE], Vec<SocketAddr>>>,
|
||||
|
||||
// Local address to which this node is bound
|
||||
bind_address: SocketAddr,
|
||||
|
||||
// Instant this node started.
|
||||
starting_instant: Instant,
|
||||
}
|
||||
|
||||
impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
|
||||
fn ms_monotonic(&self) -> i64 {
|
||||
Instant::now().duration_since(self.starting_instant).as_millis() as i64
|
||||
}
|
||||
|
||||
async fn housekeeping_task_main(self: Arc<Self>) {
|
||||
let tasks = AsyncTaskReaper::new();
|
||||
let mut sleep_for = Duration::from_millis(500);
|
||||
loop {
|
||||
tokio::time::sleep(sleep_for).await;
|
||||
|
||||
let config = self.host.node_config();
|
||||
let mut connections = self.connections.lock().await;
|
||||
let mut connecting_to = self.connecting_to.lock().await;
|
||||
let now = self.ms_monotonic();
|
||||
|
||||
// Drop dead or timed out connections, and for live connections handle sending sync requests.
|
||||
connections.retain(|_, c| {
|
||||
if !c.closed.load(Ordering::Relaxed) {
|
||||
let cc = c.clone();
|
||||
if (now - c.last_receive_time.load(Ordering::Relaxed)) < (config.connection_timeout as i64) {
|
||||
// TODO: sync init if not waiting for a sync response
|
||||
true // keep connection
|
||||
} else {
|
||||
let _ = c.read_task.lock().unwrap().take().map(|j| j.abort());
|
||||
let host = self.host.clone();
|
||||
tasks.spawn(async move {
|
||||
host.on_connection_closed(&*cc.info.lock().unwrap(), "timeout".to_string());
|
||||
});
|
||||
false // discard connection
|
||||
}
|
||||
} else {
|
||||
let host = self.host.clone();
|
||||
let cc = c.clone();
|
||||
let j = c.read_task.lock().unwrap().take();
|
||||
tasks.spawn(async move {
|
||||
if j.is_some() {
|
||||
let e = j.unwrap().await;
|
||||
if e.is_ok() {
|
||||
let e = e.unwrap();
|
||||
host.on_connection_closed(
|
||||
&*cc.info.lock().unwrap(),
|
||||
e.map_or_else(|e| e.to_string(), |_| "unknown error".to_string()),
|
||||
);
|
||||
} else {
|
||||
host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string());
|
||||
}
|
||||
} else {
|
||||
host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string());
|
||||
}
|
||||
});
|
||||
false // discard connection
|
||||
}
|
||||
});
|
||||
|
||||
let connect_timeout_at = Instant::now().add(Duration::from_millis(config.connection_timeout));
|
||||
|
||||
// Always try to connect to anchor peers.
|
||||
for sa in config.anchors.iter() {
|
||||
if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) {
|
||||
let self2 = self.clone();
|
||||
let sa = sa.clone();
|
||||
tasks.spawn(async move {
|
||||
let _ = self2.connect(&sa, connect_timeout_at).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Try to connect to more peers until desired connection count is reached.
|
||||
let desired_connection_count = config.desired_connection_count.min(config.max_connection_count);
|
||||
for sa in config.peers.iter() {
|
||||
if (connections.len() + connecting_to.len()) >= desired_connection_count {
|
||||
break;
|
||||
}
|
||||
if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) {
|
||||
let self2 = self.clone();
|
||||
let sa = sa.clone();
|
||||
tasks.spawn(async move {
|
||||
let _ = self2.connect(&sa, connect_timeout_at).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
sleep_for = Duration::from_millis(config.sync_interval.min(config.connection_timeout));
|
||||
}
|
||||
}
|
||||
|
||||
async fn announce_task_main(self: Arc<Self>) {
|
||||
let sleep_for = Duration::from_millis(ANNOUNCE_PERIOD as u64);
|
||||
let mut to_announce: Vec<([u8; KEY_SIZE], Vec<SocketAddr>)> = Vec::new();
|
||||
let background_tasks = AsyncTaskReaper::new();
|
||||
let announce_timeout = Duration::from_millis(self.host.node_config().connection_timeout);
|
||||
loop {
|
||||
tokio::time::sleep(sleep_for).await;
|
||||
|
||||
for (key, already_has) in self.announce_queue.lock().await.drain() {
|
||||
to_announce.push((key, already_has));
|
||||
}
|
||||
|
||||
let now = self.ms_monotonic();
|
||||
let have_records_est_size = (to_announce.len() * KEY_SIZE) + 2;
|
||||
let mut have_records: Vec<u8> = Vec::with_capacity(have_records_est_size);
|
||||
for c in self.connections.lock().await.iter() {
|
||||
if c.1.announce_new_records.load(Ordering::Relaxed) {
|
||||
for (key, already_has) in to_announce.iter() {
|
||||
if !already_has.contains(c.0) {
|
||||
let _ = std::io::Write::write_all(&mut have_records, key);
|
||||
}
|
||||
}
|
||||
if !have_records.is_empty() {
|
||||
let c2 = c.1.clone();
|
||||
background_tasks.spawn(async move {
|
||||
// If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to
|
||||
// write buffering but a short timeout prevents them from building up too much.
|
||||
let _ = tokio::time::timeout(
|
||||
announce_timeout,
|
||||
c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now),
|
||||
);
|
||||
});
|
||||
have_records = Vec::with_capacity(have_records_est_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
to_announce.clear();
|
||||
}
|
||||
}
|
||||
|
||||
async fn listener_task_main(self: Arc<Self>, listener: TcpListener) {
|
||||
loop {
|
||||
let socket = listener.accept().await;
|
||||
if socket.is_ok() {
|
||||
let (stream, address) = socket.unwrap();
|
||||
if self.host.allow(&address) {
|
||||
let config = self.host.node_config();
|
||||
if self.connections.lock().await.len() < config.max_connection_count || config.anchors.contains(&address) {
|
||||
Self::connection_start(&self, address, stream, true).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal connection method.
|
||||
///
|
||||
/// Note that this does not add the address to connecting_to. Instead it's done by the caller
|
||||
/// to avoid races and similar things. It is removed from connecting_to once the connection
|
||||
/// either succeeds or fails.
|
||||
async fn connect(self: &Arc<Self>, address: &SocketAddr, deadline: Instant) -> std::io::Result<bool> {
|
||||
let f = async {
|
||||
let stream = if address.is_ipv4() {
|
||||
TcpSocket::new_v4()
|
||||
} else {
|
||||
TcpSocket::new_v6()
|
||||
}?;
|
||||
configure_tcp_socket(&stream)?;
|
||||
stream.bind(self.bind_address.clone())?;
|
||||
|
||||
let stream = tokio::time::timeout_at(deadline, stream.connect(address.clone()));
|
||||
self.host.on_connect_attempt(address);
|
||||
let stream = stream.await;
|
||||
|
||||
if stream.is_ok() {
|
||||
Ok(self.connection_start(address.clone(), stream.unwrap()?, false).await)
|
||||
} else {
|
||||
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))
|
||||
}
|
||||
};
|
||||
let r = f.await;
|
||||
let _ = self.connecting_to.lock().await.remove(address);
|
||||
r
|
||||
}
|
||||
|
||||
/// Initialize and start a connection whether incoming or outgoing.
|
||||
async fn connection_start(self: &Arc<Self>, address: SocketAddr, stream: TcpStream, inbound: bool) -> bool {
|
||||
let mut ok = false;
|
||||
let _ = self.connections.lock().await.entry(address.clone()).or_insert_with(|| {
|
||||
ok = true;
|
||||
let _ = stream.set_nodelay(false);
|
||||
let (reader, writer) = stream.into_split();
|
||||
let now = self.ms_monotonic();
|
||||
let connection = Arc::new(Connection {
|
||||
writer: Mutex::new(writer),
|
||||
last_send_time: AtomicI64::new(now),
|
||||
last_receive_time: AtomicI64::new(now),
|
||||
info: std::sync::Mutex::new(RemoteNodeInfo {
|
||||
name: String::new(),
|
||||
contact: String::new(),
|
||||
remote_address: address.clone(),
|
||||
explicit_addresses: Vec::new(),
|
||||
connect_time: ms_since_epoch(),
|
||||
connect_instant: now,
|
||||
inbound,
|
||||
initialized: false,
|
||||
}),
|
||||
read_task: std::sync::Mutex::new(None),
|
||||
announce_new_records: AtomicBool::new(false),
|
||||
closed: AtomicBool::new(false),
|
||||
});
|
||||
let self2 = self.clone();
|
||||
let c2 = connection.clone();
|
||||
connection.read_task.lock().unwrap().replace(tokio::spawn(async move {
|
||||
let result = self2.connection_io_task_main(&c2, address, reader).await;
|
||||
c2.closed.store(true, Ordering::Relaxed);
|
||||
result
|
||||
}));
|
||||
connection
|
||||
});
|
||||
ok
|
||||
}
|
||||
|
||||
async fn connection_io_task_main(
|
||||
self: Arc<Self>,
|
||||
connection: &Arc<Connection>,
|
||||
remote_address: SocketAddr,
|
||||
mut reader: OwnedReadHalf,
|
||||
) -> std::io::Result<()> {
|
||||
const BUF_CHUNK_SIZE: usize = 4096;
|
||||
const READ_BUF_INITIAL_SIZE: usize = 65536; // should be a multiple of BUF_CHUNK_SIZE
|
||||
|
||||
let mut write_buffer: Vec<u8> = Vec::with_capacity(BUF_CHUNK_SIZE);
|
||||
let mut read_buffer: Vec<u8> = Vec::new();
|
||||
read_buffer.resize(READ_BUF_INITIAL_SIZE, 0);
|
||||
|
||||
let config = self.host.node_config();
|
||||
let mut anti_loopback_challenge_sent = [0_u8; 64];
|
||||
let mut domain_challenge_sent = [0_u8; 64];
|
||||
let mut auth_challenge_sent = [0_u8; 64];
|
||||
self.host.get_secure_random(&mut anti_loopback_challenge_sent);
|
||||
self.host.get_secure_random(&mut domain_challenge_sent);
|
||||
self.host.get_secure_random(&mut auth_challenge_sent);
|
||||
connection
|
||||
.send_obj(
|
||||
&mut write_buffer,
|
||||
MessageType::Init,
|
||||
&msg::Init {
|
||||
anti_loopback_challenge: &anti_loopback_challenge_sent,
|
||||
domain_challenge: &domain_challenge_sent,
|
||||
auth_challenge: &auth_challenge_sent,
|
||||
node_name: config.name.as_str(),
|
||||
node_contact: config.contact.as_str(),
|
||||
locally_bound_port: self.bind_address.port(),
|
||||
explicit_ipv4: None,
|
||||
explicit_ipv6: None,
|
||||
},
|
||||
self.ms_monotonic(),
|
||||
)
|
||||
.await?;
|
||||
drop(config);
|
||||
|
||||
let max_message_size = ((D::MAX_VALUE_SIZE * 8) + (D::KEY_SIZE * 1024) + 65536) as u64; // sanity limit
|
||||
let mut initialized = false;
|
||||
let mut init_received = false;
|
||||
let mut buffer_fill = 0_usize;
|
||||
loop {
|
||||
let message_type: MessageType;
|
||||
let message_size: usize;
|
||||
let header_size: usize;
|
||||
let total_size: usize;
|
||||
loop {
|
||||
buffer_fill += reader.read(&mut read_buffer.as_mut_slice()[buffer_fill..]).await?;
|
||||
if buffer_fill >= 2 {
|
||||
// type and at least one byte of varint
|
||||
let ms = varint::decode(&read_buffer.as_slice()[1..]);
|
||||
if ms.1 > 0 {
|
||||
// varint is all there and parsed correctly
|
||||
if ms.0 > max_message_size {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "message too large"));
|
||||
}
|
||||
|
||||
message_type = MessageType::from(*read_buffer.get(0).unwrap());
|
||||
message_size = ms.0 as usize;
|
||||
header_size = 1 + ms.1;
|
||||
total_size = header_size + message_size;
|
||||
|
||||
if read_buffer.len() < total_size {
|
||||
read_buffer.resize(((total_size / BUF_CHUNK_SIZE) + 1) * BUF_CHUNK_SIZE, 0);
|
||||
}
|
||||
while buffer_fill < total_size {
|
||||
buffer_fill += reader.read(&mut read_buffer.as_mut_slice()[buffer_fill..]).await?;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
let message = &read_buffer.as_slice()[header_size..total_size];
|
||||
|
||||
let now = self.ms_monotonic();
|
||||
connection.last_receive_time.store(now, Ordering::Relaxed);
|
||||
|
||||
match message_type {
|
||||
MessageType::Nop => {}
|
||||
|
||||
MessageType::Init => {
|
||||
if init_received {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::Other, "duplicate init"));
|
||||
}
|
||||
init_received = true;
|
||||
|
||||
let msg: msg::Init = decode_msgpack(message)?;
|
||||
let (anti_loopback_response, domain_challenge_response, auth_challenge_response) = {
|
||||
let mut info = connection.info.lock().unwrap();
|
||||
info.name = msg.node_name.to_string();
|
||||
info.contact = msg.node_contact.to_string();
|
||||
let _ = msg.explicit_ipv4.map(|pv4| {
|
||||
info.explicit_addresses
|
||||
.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(pv4.ip), pv4.port)));
|
||||
});
|
||||
let _ = msg.explicit_ipv6.map(|pv6| {
|
||||
info.explicit_addresses
|
||||
.push(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(pv6.ip), pv6.port, 0, 0)));
|
||||
});
|
||||
let info = info.clone();
|
||||
|
||||
let auth_challenge_response = self.host.authenticate(&info, msg.auth_challenge);
|
||||
if auth_challenge_response.is_none() {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"authenticate() returned None, connection dropped",
|
||||
));
|
||||
}
|
||||
let auth_challenge_response = auth_challenge_response.unwrap();
|
||||
|
||||
(
|
||||
H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge),
|
||||
H::hmac_sha512(&H::sha512(&[self.host.node_config().domain.as_bytes()]), msg.domain_challenge),
|
||||
auth_challenge_response,
|
||||
)
|
||||
};
|
||||
|
||||
connection
|
||||
.send_obj(
|
||||
&mut write_buffer,
|
||||
MessageType::InitResponse,
|
||||
&msg::InitResponse {
|
||||
anti_loopback_response: &anti_loopback_response,
|
||||
domain_response: &domain_challenge_response,
|
||||
auth_response: &auth_challenge_response,
|
||||
},
|
||||
now,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
MessageType::InitResponse => {
|
||||
let msg: msg::InitResponse = decode_msgpack(message)?;
|
||||
let mut info = connection.info.lock().unwrap();
|
||||
|
||||
if info.initialized {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::Other, "duplicate init response"));
|
||||
}
|
||||
|
||||
if msg
|
||||
.anti_loopback_response
|
||||
.eq(&H::hmac_sha512(&self.anti_loopback_secret, &anti_loopback_challenge_sent))
|
||||
{
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::Other, "rejected connection to self"));
|
||||
}
|
||||
if !msg.domain_response.eq(&H::hmac_sha512(
|
||||
&H::sha512(&[self.host.node_config().domain.as_bytes()]),
|
||||
&domain_challenge_sent,
|
||||
)) {
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::Other, "domain mismatch"));
|
||||
}
|
||||
if !self
|
||||
.host
|
||||
.authenticate(&info, &auth_challenge_sent)
|
||||
.map_or(false, |cr| msg.auth_response.eq(&cr))
|
||||
{
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"challenge/response authentication failed",
|
||||
));
|
||||
}
|
||||
|
||||
initialized = true;
|
||||
info.initialized = true;
|
||||
|
||||
let info = info.clone(); // also releases lock since info is replaced/destroyed
|
||||
self.host.on_connect(&info);
|
||||
}
|
||||
|
||||
// Handle messages other than INIT and INIT_RESPONSE after checking 'initialized' flag.
|
||||
_ => {
|
||||
if !initialized {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"init exchange must be completed before other messages are sent",
|
||||
));
|
||||
}
|
||||
|
||||
match message_type {
|
||||
MessageType::HaveRecords => {}
|
||||
|
||||
MessageType::GetRecords => {}
|
||||
|
||||
MessageType::Record => {
|
||||
let key = H::sha512(&[message]);
|
||||
match self.datastore.store(&key, message).await {
|
||||
StoreResult::Ok => {
|
||||
let mut q = self.announce_queue.lock().await;
|
||||
let ql = q.entry(key).or_insert_with(|| Vec::with_capacity(2));
|
||||
if !ql.contains(&remote_address) {
|
||||
ql.push(remote_address.clone());
|
||||
}
|
||||
}
|
||||
StoreResult::Rejected => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("record rejected by data store: {}", to_hex_string(&key)),
|
||||
));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
MessageType::SyncRequest => {
|
||||
let msg: msg::SyncRequest = decode_msgpack(message)?;
|
||||
}
|
||||
|
||||
MessageType::Sync => {
|
||||
let msg: msg::Sync = decode_msgpack(message)?;
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
read_buffer.copy_within(total_size..buffer_fill, 0);
|
||||
buffer_fill -= total_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: DataStore + 'static, H: Host + 'static> Drop for NodeInternal<D, H> {
|
||||
fn drop(&mut self) {
|
||||
let _ = tokio::runtime::Handle::try_current().map_or_else(
|
||||
|_| {
|
||||
for (_, c) in self.connections.blocking_lock().drain() {
|
||||
c.read_task.lock().unwrap().as_mut().map(|c| c.abort());
|
||||
}
|
||||
},
|
||||
|h| {
|
||||
let _ = h.block_on(async {
|
||||
for (_, c) in self.connections.lock().await.drain() {
|
||||
c.read_task.lock().unwrap().as_mut().map(|c| c.abort());
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
struct Connection {
|
||||
writer: Mutex<OwnedWriteHalf>,
|
||||
last_send_time: AtomicI64,
|
||||
last_receive_time: AtomicI64,
|
||||
info: std::sync::Mutex<RemoteNodeInfo>,
|
||||
read_task: std::sync::Mutex<Option<JoinHandle<std::io::Result<()>>>>,
|
||||
announce_new_records: AtomicBool,
|
||||
closed: AtomicBool,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
async fn send_msg(&self, message_type: MessageType, data: &[u8], now: i64) -> std::io::Result<()> {
|
||||
let mut header: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() };
|
||||
header[0] = message_type as u8;
|
||||
let header_size = 1 + varint::encode(&mut header[1..], data.len() as u64);
|
||||
if self
|
||||
.writer
|
||||
.lock()
|
||||
.await
|
||||
.write_vectored(&[IoSlice::new(&header[0..header_size]), IoSlice::new(data)])
|
||||
.await?
|
||||
== (data.len() + header_size)
|
||||
{
|
||||
self.last_send_time.store(now, Ordering::Relaxed);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "write error"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_obj<O: Serialize>(&self, write_buf: &mut Vec<u8>, message_type: MessageType, obj: &O, now: i64) -> std::io::Result<()> {
|
||||
write_buf.clear();
|
||||
if rmp_serde::encode::write_named(write_buf, obj).is_ok() {
|
||||
self.send_msg(message_type, write_buf.as_slice(), now).await
|
||||
} else {
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"serialize failure (internal error)",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,180 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
#[repr(u8)]
|
||||
pub enum MessageType {
|
||||
/// No operation, payload ignored.
|
||||
Nop = 0_u8,
|
||||
|
||||
/// msg::Init (msgpack)
|
||||
Init = 1_u8,
|
||||
|
||||
/// msg::InitResponse (msgpack)
|
||||
InitResponse = 2_u8,
|
||||
|
||||
/// <full record key>[<full record key>...]
|
||||
HaveRecords = 3_u8,
|
||||
|
||||
/// <u8 length of each key prefix in bytes>[<key>...]
|
||||
GetRecords = 4_u8,
|
||||
|
||||
/// <record>
|
||||
Record = 5_u8,
|
||||
|
||||
/// msg::SyncRequest (msgpack)
|
||||
SyncRequest = 6_u8,
|
||||
|
||||
/// msg::Sync (msgpack)
|
||||
Sync = 7_u8,
|
||||
}
|
||||
|
||||
const MESSAGE_TYPE_MAX: u8 = 7;
|
||||
|
||||
impl From<u8> for MessageType {
|
||||
/// Get a type from a byte, returning the Nop type if the byte is out of range.
|
||||
#[inline(always)]
|
||||
fn from(b: u8) -> Self {
|
||||
if b <= MESSAGE_TYPE_MAX {
|
||||
unsafe { std::mem::transmute(b) }
|
||||
} else {
|
||||
Self::Nop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageType {
|
||||
#[allow(unused)]
|
||||
pub fn name(&self) -> &'static str {
|
||||
match *self {
|
||||
Self::Nop => "NOP",
|
||||
Self::Init => "INIT",
|
||||
Self::InitResponse => "INIT_RESPONSE",
|
||||
Self::HaveRecords => "HAVE_RECORDS",
|
||||
Self::GetRecords => "GET_RECORDS",
|
||||
Self::Record => "RECORD",
|
||||
Self::SyncRequest => "SYNC_REQUEST",
|
||||
Self::Sync => "SYNC",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Msgpack serializable message types.
|
||||
/// Some that are frequently transferred use shortened names to save bandwidth.
|
||||
pub mod msg {
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct IPv4 {
|
||||
pub ip: [u8; 4],
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct IPv6 {
|
||||
pub ip: [u8; 16],
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Init<'a> {
|
||||
/// A random challenge to be hashed with a secret to detect and drop connections to self.
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub anti_loopback_challenge: &'a [u8],
|
||||
|
||||
/// A random challenge for checking the data set domain.
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub domain_challenge: &'a [u8],
|
||||
|
||||
/// A random challenge for login/authentication.
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub auth_challenge: &'a [u8],
|
||||
|
||||
/// Optional name to advertise for this node.
|
||||
pub node_name: &'a str,
|
||||
|
||||
/// Optional contact information for this node, such as a URL or an e-mail address.
|
||||
pub node_contact: &'a str,
|
||||
|
||||
/// Port to which this node has locally bound.
|
||||
/// This is used to try to auto-detect whether a NAT is in the way.
|
||||
pub locally_bound_port: u16,
|
||||
|
||||
/// An IPv4 address where this node can be reached.
|
||||
/// If both explicit_ipv4 and explicit_ipv6 are omitted the physical source IP:port may be used.
|
||||
pub explicit_ipv4: Option<IPv4>,
|
||||
|
||||
/// An IPv6 address where this node can be reached.
|
||||
/// If both explicit_ipv4 and explicit_ipv6 are omitted the physical source IP:port may be used.
|
||||
pub explicit_ipv6: Option<IPv6>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InitResponse<'a> {
|
||||
/// HMAC-SHA512(local secret, anti_loopback_challenge) to detect and drop loops.
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub anti_loopback_response: &'a [u8],
|
||||
|
||||
/// HMAC-SHA512(SHA512(domain), domain_challenge) to check that the data set domain matches.
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub domain_response: &'a [u8],
|
||||
|
||||
/// HMAC-SHA512(secret, challenge) for authentication. (If auth is not enabled, an all-zero secret is used.)
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub auth_response: &'a [u8],
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct SyncRequest<'a> {
|
||||
/// Starting range to query, padded with zeroes if shorter than KEY_SIZE.
|
||||
#[serde(with = "serde_bytes")]
|
||||
#[serde(rename = "s")]
|
||||
pub range_start: &'a [u8],
|
||||
|
||||
/// Ending range to query, padded with 0xff if shorter than KEY_SIZE.
|
||||
#[serde(with = "serde_bytes")]
|
||||
#[serde(rename = "e")]
|
||||
pub range_end: &'a [u8],
|
||||
|
||||
/// Data-store-specific subset selector indicating what subset of items desired
|
||||
#[serde(with = "serde_bytes")]
|
||||
#[serde(rename = "q")]
|
||||
pub subset: Option<&'a [u8]>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Sync<'a> {
|
||||
/// Starting range summarized, padded with zeroes if shorter than KEY_SIZE.
|
||||
#[serde(with = "serde_bytes")]
|
||||
#[serde(rename = "s")]
|
||||
pub range_start: &'a [u8],
|
||||
|
||||
/// Ending range summarized, padded with 0xff if shorter than KEY_SIZE.
|
||||
#[serde(with = "serde_bytes")]
|
||||
#[serde(rename = "e")]
|
||||
pub range_end: &'a [u8],
|
||||
|
||||
/// Data-store-specific subset selector indicating what subset of items were included
|
||||
#[serde(with = "serde_bytes")]
|
||||
#[serde(rename = "q")]
|
||||
pub subset: Option<&'a [u8]>,
|
||||
|
||||
/// Number of buckets in IBLT
|
||||
#[serde(rename = "b")]
|
||||
pub iblt_buckets: usize,
|
||||
|
||||
/// Number of bytes in each IBLT item (key prefix)
|
||||
#[serde(rename = "l")]
|
||||
pub iblt_item_bytes: usize,
|
||||
|
||||
/// Set summary for keys under prefix within subset
|
||||
#[serde(with = "serde_bytes")]
|
||||
#[serde(rename = "i")]
|
||||
pub iblt: &'a [u8],
|
||||
}
|
||||
}
|
|
@ -1,129 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// Get the real time clock in milliseconds since Unix epoch.
|
||||
pub fn ms_since_epoch() -> i64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64
|
||||
}
|
||||
|
||||
/// Encode a byte slice to a hexadecimal string.
|
||||
pub fn to_hex_string(b: &[u8]) -> String {
|
||||
const HEX_CHARS: [u8; 16] = [
|
||||
b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9', b'a', b'b', b'c', b'd', b'e', b'f',
|
||||
];
|
||||
let mut s = String::new();
|
||||
s.reserve(b.len() * 2);
|
||||
for c in b {
|
||||
let x = *c as usize;
|
||||
s.push(HEX_CHARS[x >> 4] as char);
|
||||
s.push(HEX_CHARS[x & 0xf] as char);
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn xorshift64(mut x: u64) -> u64 {
|
||||
x ^= x.wrapping_shl(13);
|
||||
x ^= x.wrapping_shr(7);
|
||||
x ^= x.wrapping_shl(17);
|
||||
x
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn splitmix64(mut x: u64) -> u64 {
|
||||
x ^= x.wrapping_shr(30);
|
||||
x = x.wrapping_mul(0xbf58476d1ce4e5b9);
|
||||
x ^= x.wrapping_shr(27);
|
||||
x = x.wrapping_mul(0x94d049bb133111eb);
|
||||
x ^= x.wrapping_shr(31);
|
||||
x
|
||||
}
|
||||
|
||||
/*
|
||||
#[inline(always)]
|
||||
pub fn splitmix64_inverse(mut x: u64) -> u64 {
|
||||
x ^= x.wrapping_shr(31) ^ x.wrapping_shr(62);
|
||||
x = x.wrapping_mul(0x319642b2d24d8ec3);
|
||||
x ^= x.wrapping_shr(27) ^ x.wrapping_shr(54);
|
||||
x = x.wrapping_mul(0x96de1b173f119089);
|
||||
x ^= x.wrapping_shr(30) ^ x.wrapping_shr(60);
|
||||
x
|
||||
}
|
||||
*/
|
||||
|
||||
static mut RANDOM_STATE_0: u64 = 0;
|
||||
static mut RANDOM_STATE_1: u64 = 0;
|
||||
|
||||
/// Get a non-cryptographic pseudorandom number.
|
||||
pub fn random() -> u64 {
|
||||
let (mut s0, mut s1) = unsafe { (RANDOM_STATE_0, RANDOM_STATE_1) };
|
||||
if s0 == 0 {
|
||||
s0 = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64;
|
||||
}
|
||||
if s1 == 0 {
|
||||
s1 = splitmix64(std::process::id() as u64);
|
||||
}
|
||||
let s1_new = xorshift64(s1);
|
||||
s0 = splitmix64(s0.wrapping_add(s1));
|
||||
s1 = s1_new;
|
||||
unsafe {
|
||||
RANDOM_STATE_0 = s0;
|
||||
RANDOM_STATE_1 = s1;
|
||||
};
|
||||
s0
|
||||
}
|
||||
|
||||
/// Wrapper for tokio::spawn() that aborts tasks not yet completed when it is dropped.
|
||||
pub struct AsyncTaskReaper {
|
||||
ctr: AtomicUsize,
|
||||
handles: Arc<std::sync::Mutex<HashMap<usize, JoinHandle<()>>>>,
|
||||
}
|
||||
|
||||
impl AsyncTaskReaper {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
ctr: AtomicUsize::new(0),
|
||||
handles: Arc::new(std::sync::Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a new task.
|
||||
///
|
||||
/// Note that currently any task output is ignored. This is for fire and forget
|
||||
/// background tasks that you want to be collected on loss of scope.
|
||||
pub fn spawn<F: Future + Send + 'static>(&self, future: F) {
|
||||
let id = self.ctr.fetch_add(1, Ordering::Relaxed);
|
||||
let handles = self.handles.clone();
|
||||
self.handles.lock().unwrap().insert(
|
||||
id,
|
||||
tokio::spawn(async move {
|
||||
let _ = future.await;
|
||||
let _ = handles.lock().unwrap().remove(&id);
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for AsyncTaskReaper {
|
||||
fn drop(&mut self) {
|
||||
for (_, h) in self.handles.lock().unwrap().iter() {
|
||||
h.abort();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,45 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* (c)2022 ZeroTier, Inc.
|
||||
* https://www.zerotier.com/
|
||||
*/
|
||||
|
||||
#[allow(unused)]
|
||||
pub const VARINT_MAX_SIZE_BYTES: usize = 10;
|
||||
|
||||
pub fn encode(b: &mut [u8], mut v: u64) -> usize {
|
||||
let mut i = 0;
|
||||
loop {
|
||||
if v > 0x7f {
|
||||
b[i] = (v as u8) & 0x7f;
|
||||
i += 1;
|
||||
v = v.wrapping_shr(7);
|
||||
} else {
|
||||
b[i] = (v as u8) | 0x80;
|
||||
i += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
i
|
||||
}
|
||||
|
||||
pub fn decode(b: &[u8]) -> (u64, usize) {
|
||||
let mut v = 0_u64;
|
||||
let mut pos = 0;
|
||||
let mut l = 0;
|
||||
let bl = b.len();
|
||||
while l < bl {
|
||||
let x = b[l];
|
||||
l += 1;
|
||||
if x <= 0x7f {
|
||||
v |= (x as u64).wrapping_shl(pos);
|
||||
pos += 7;
|
||||
} else {
|
||||
v |= ((x & 0x7f) as u64).wrapping_shl(pos);
|
||||
return (v, l);
|
||||
}
|
||||
}
|
||||
return (0, 0);
|
||||
}
|
Loading…
Add table
Reference in a new issue