Tons more work: (1) remove async again from the core, (2) controller stuff, (3) retrofit other stuff to handle non-async core.

This commit is contained in:
Adam Ierymenko 2022-09-21 16:43:47 -04:00
parent 7ec46540fa
commit 768ec6e710
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
25 changed files with 842 additions and 719 deletions

View file

@ -12,6 +12,8 @@ zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
zerotier-network-hypervisor = { path = "../network-hypervisor" }
zerotier-vl1-service = { path = "../vl1-service" }
async-trait = "^0"
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
async-trait = "^0"

View file

@ -1,102 +1,135 @@
use std::sync::Arc;
use crate::database::Database;
use async_trait::async_trait;
use std::sync::Arc;
use tokio::time::{Duration, Instant};
use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer};
use zerotier_network_hypervisor::util::dictionary::Dictionary;
use zerotier_network_hypervisor::util::marshalable::MarshalUnmarshalError;
use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, Path, Peer};
use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, PacketHandlerResult, Path, Peer};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::reaper::Reaper;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Controller<DatabaseImpl: Database> {
pub database: Arc<DatabaseImpl>,
database: Arc<DatabaseImpl>,
reaper: Reaper,
}
impl<DatabaseImpl: Database> Controller<DatabaseImpl> {
pub async fn new(database: Arc<DatabaseImpl>) -> Arc<Self> {
Arc::new(Self { database })
Arc::new(Self { database, reaper: Reaper::new() })
}
async fn handle_network_config_request<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
payload: &PacketBuffer,
) -> Result<(), MarshalUnmarshalError> {
let mut cursor = 0;
let network_id = NetworkId::from_u64(payload.read_u64(&mut cursor)?);
if network_id.is_none() {
return Err(MarshalUnmarshalError::InvalidData);
}
let network_id = network_id.unwrap();
let meta_data = if cursor < payload.len() {
let meta_data_len = payload.read_u16(&mut cursor)?;
let d = Dictionary::from_bytes(payload.read_bytes(meta_data_len as usize, &mut cursor)?);
if d.is_none() {
return Err(MarshalUnmarshalError::InvalidData);
}
d.unwrap()
} else {
Dictionary::new()
};
let (have_revision, have_timestamp) = if cursor < payload.len() {
let r = payload.read_u64(&mut cursor)?;
let t = payload.read_u64(&mut cursor)?;
(Some(r), Some(t))
} else {
(None, None)
};
if let Ok(Some(network)) = self.database.get_network(network_id).await {}
return Ok(());
database: Arc<DatabaseImpl>,
source: Arc<Peer<HostSystemImpl>>,
source_path: Arc<Path<HostSystemImpl>>,
network_id: NetworkId,
meta_data: Dictionary,
have_revision: Option<u64>,
have_timestamp: Option<u64>,
) {
if let Ok(Some(network)) = database.get_network(network_id).await {}
}
}
#[async_trait]
impl<DatabaseImpl: Database> InnerProtocol for Controller<DatabaseImpl> {
async fn handle_packet<HostSystemImpl: HostSystem>(
fn handle_packet<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
verb: u8,
payload: &PacketBuffer,
) -> bool {
) -> PacketHandlerResult {
match verb {
verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => {
let _ = self.handle_network_config_request(source, source_path, payload).await;
// TODO: display/log errors
true
let mut cursor = 0;
let network_id = payload.read_u64(&mut cursor);
if network_id.is_err() {
return PacketHandlerResult::Error;
}
let network_id = NetworkId::from_u64(network_id.unwrap());
if network_id.is_none() {
return PacketHandlerResult::Error;
}
let network_id = network_id.unwrap();
let meta_data = if cursor < payload.len() {
let meta_data_len = payload.read_u16(&mut cursor);
if meta_data_len.is_err() {
return PacketHandlerResult::Error;
}
if let Ok(d) = payload.read_bytes(meta_data_len.unwrap() as usize, &mut cursor) {
let d = Dictionary::from_bytes(d);
if d.is_none() {
return PacketHandlerResult::Error;
}
d.unwrap()
} else {
return PacketHandlerResult::Error;
}
} else {
Dictionary::new()
};
let (have_revision, have_timestamp) = if cursor < payload.len() {
let r = payload.read_u64(&mut cursor);
let t = payload.read_u64(&mut cursor);
if r.is_err() || t.is_err() {
return PacketHandlerResult::Error;
}
(Some(r.unwrap()), Some(t.unwrap()))
} else {
(None, None)
};
if let Some(deadline) = Instant::now().checked_add(REQUEST_TIMEOUT) {
self.reaper.add(
tokio::spawn(Self::handle_network_config_request(
self.database.clone(),
source.clone(),
source_path.clone(),
network_id,
meta_data,
have_revision,
have_timestamp,
)),
deadline,
);
} else {
eprintln!("WARNING: instant + REQUEST_TIMEOUT overflowed! should be impossible.");
}
PacketHandlerResult::Ok
}
_ => false,
_ => PacketHandlerResult::NotHandled,
}
}
async fn handle_error<HostSystemImpl: HostSystem>(
fn handle_error<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
in_re_verb: u8,
in_re_message_id: u64,
error_code: u8,
payload: &PacketBuffer,
cursor: &mut usize,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
async fn handle_ok<HostSystemImpl: HostSystem>(
fn handle_ok<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
in_re_verb: u8,
in_re_message_id: u64,
payload: &PacketBuffer,
cursor: &mut usize,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
fn should_communicate_with(&self, _: &Identity) -> bool {

View file

@ -7,17 +7,18 @@ version = "0.1.0"
[dependencies]
zerotier-utils = { path = "../utils" }
ed25519-dalek = {version = "1.0.1", features = ["std", "u64_backend"], default-features = false}
ed25519-dalek = { version = "1.0.1", features = ["std", "u64_backend"], default-features = false }
foreign-types = "0.3.1"
lazy_static = "^1"
openssl = {version = "^0", features = [], default-features = false}
parking_lot = {version = "^0", features = [], default-features = false}
poly1305 = {version = "0.7.2", features = [], default-features = false}
pqc_kyber = {path = "../third_party/kyber", features = ["kyber1024", "reference"], default-features = false}
openssl = { version = "^0", features = [], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
poly1305 = { version = "0.7.2", features = [], default-features = false }
pqc_kyber = { path = "../third_party/kyber", features = ["kyber1024", "reference"], default-features = false }
#pqc_kyber = { version = "^0", features = ["kyber1024", "reference"], default-features = false }
rand_core = "0.5.1"
rand_core_062 = {package = "rand_core", version = "0.6.2"}
rand_core_062 = { package = "rand_core", version = "0.6.2" }
subtle = "2.4.1"
x25519-dalek = {version = "1.2.0", features = ["std", "u64_backend"], default-features = false}
x25519-dalek = { version = "1.2.0", features = ["std", "u64_backend"], default-features = false }
[target."cfg(not(any(target_os = \"macos\", target_os = \"ios\")))".dependencies]
openssl = "^0"

View file

@ -12,7 +12,6 @@ debug_events = []
[dependencies]
zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
async-trait = "^0"
base64 = "^0"
lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] }
parking_lot = { version = "^0", features = [], default-features = false }
@ -22,7 +21,7 @@ serde = { version = "^1", features = ["derive"], default-features = false }
rand = "*"
serde_json = "*"
serde_cbor = "*"
criterion = "0.3"
criterion = "^0"
[target."cfg(not(windows))".dependencies]
libc = "^0"

View file

@ -9,7 +9,7 @@ use zerotier_utils::buffer::{Buffer, PooledBufferFactory};
use zerotier_utils::pool::{Pool, Pooled};
/*
* Protocol versions
* Legacy V1 protocol versions:
*
* 1 - 0.2.0 ... 0.2.5
* 2 - 0.3.0 ... 0.4.5
@ -46,11 +46,14 @@ pub const PROTOCOL_VERSION: u8 = 20;
/// We could probably push it back to 8 or 9 with some added support for sending Salsa/Poly packets.
pub const PROTOCOL_VERSION_MIN: u8 = 11;
/// Size of a pooled packet buffer.
pub const PACKET_BUFFER_SIZE: usize = 16384;
/// Buffer sized for ZeroTier packets.
pub type PacketBuffer = Buffer<{ v1::SIZE_MAX }>;
pub type PacketBuffer = Buffer<PACKET_BUFFER_SIZE>;
/// Factory type to supply to a new PacketBufferPool, used in PooledPacketBuffer and PacketBufferPool types.
pub type PacketBufferFactory = PooledBufferFactory<{ crate::protocol::v1::SIZE_MAX }>;
pub type PacketBufferFactory = PooledBufferFactory<PACKET_BUFFER_SIZE>;
/// Packet buffer checked out of pool, automatically returns on drop.
pub type PooledPacketBuffer = Pooled<PacketBuffer, PacketBufferFactory>;
@ -58,9 +61,6 @@ pub type PooledPacketBuffer = Pooled<PacketBuffer, PacketBufferFactory>;
/// Source for instances of PacketBuffer
pub type PacketBufferPool = Pool<PacketBuffer, PacketBufferFactory>;
/// 64-bit packet (outer) ID.
pub type PacketId = u64;
/// 64-bit message ID (obtained after AEAD decryption).
pub type MessageId = u64;
@ -116,9 +116,6 @@ pub const ADDRESS_SIZE_STRING: usize = 10;
/// Prefix indicating reserved addresses (that can't actually be addresses).
pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff;
/// Size of an identity fingerprint (SHA384)
pub const IDENTITY_FINGERPRINT_SIZE: usize = 48;
pub(crate) mod v1 {
use super::*;
@ -276,7 +273,7 @@ pub(crate) mod v1 {
impl PacketHeader {
#[inline(always)]
pub fn packet_id(&self) -> PacketId {
pub fn packet_id(&self) -> u64 {
u64::from_ne_bytes(self.id)
}
@ -344,7 +341,7 @@ pub(crate) mod v1 {
impl FragmentHeader {
#[inline(always)]
pub fn packet_id(&self) -> PacketId {
pub fn packet_id(&self) -> u64 {
u64::from_ne_bytes(self.id)
}
@ -424,38 +421,38 @@ pub(crate) mod v1 {
}
}
/// Maximum difference between current message ID and OK/ERROR in-re message ID.
pub const PACKET_RESPONSE_COUNTER_DELTA_MAX: u64 = 4096;
/// Maximum delta between the message ID of a sent packet and its response.
pub(crate) const PACKET_RESPONSE_COUNTER_DELTA_MAX: u64 = 256;
/// Frequency for WHOIS retries
pub const WHOIS_RETRY_INTERVAL: i64 = 1000;
/// Frequency for WHOIS retries in milliseconds.
pub(crate) const WHOIS_RETRY_INTERVAL: i64 = 1500;
/// Maximum number of WHOIS retries
pub const WHOIS_RETRY_MAX: u16 = 3;
pub(crate) const WHOIS_RETRY_COUNT_MAX: u16 = 3;
/// Maximum number of packets to queue up behind a WHOIS.
pub const WHOIS_MAX_WAITING_PACKETS: usize = 64;
pub(crate) const WHOIS_MAX_WAITING_PACKETS: usize = 32;
/// Keepalive interval for paths in milliseconds.
pub const PATH_KEEPALIVE_INTERVAL: i64 = 20000;
pub(crate) const PATH_KEEPALIVE_INTERVAL: i64 = 20000;
/// Path object expiration time in milliseconds since last receive.
pub const PATH_EXPIRATION_TIME: i64 = (PATH_KEEPALIVE_INTERVAL * 2) + 10000;
pub(crate) const PATH_EXPIRATION_TIME: i64 = (PATH_KEEPALIVE_INTERVAL * 2) + 10000;
/// How often to send HELLOs to roots, which is more often than normal peers.
pub const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2;
pub(crate) const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2;
/// How often to send HELLOs to roots when we are offline.
pub const ROOT_HELLO_SPAM_INTERVAL: i64 = 5000;
pub(crate) const ROOT_HELLO_SPAM_INTERVAL: i64 = 5000;
/// How often to send HELLOs to regular peers.
pub const PEER_HELLO_INTERVAL_MAX: i64 = 300000;
pub(crate) const PEER_HELLO_INTERVAL_MAX: i64 = 300000;
/// Timeout for path association with peers and for peers themselves.
pub const PEER_EXPIRATION_TIME: i64 = (PEER_HELLO_INTERVAL_MAX * 2) + 10000;
pub(crate) const PEER_EXPIRATION_TIME: i64 = (PEER_HELLO_INTERVAL_MAX * 2) + 10000;
/// Proof of work difficulty (threshold) for identity generation.
pub const IDENTITY_POW_THRESHOLD: u8 = 17;
pub(crate) const IDENTITY_POW_THRESHOLD: u8 = 17;
#[cfg(test)]
mod tests {

View file

@ -7,8 +7,8 @@ use std::str::FromStr;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::error::InvalidFormatError;
use crate::protocol::IDENTITY_FINGERPRINT_SIZE;
use crate::util::marshalable::*;
use crate::vl1::identity::IDENTITY_FINGERPRINT_SIZE;
use crate::vl1::inetaddress::InetAddress;
use crate::vl1::{Address, MAC};

View file

@ -19,11 +19,14 @@ use zerotier_utils::hex;
use zerotier_utils::memory::{as_byte_array, as_flat_object};
use crate::error::{InvalidFormatError, InvalidParameterError};
use crate::protocol::{ADDRESS_SIZE, ADDRESS_SIZE_STRING, IDENTITY_FINGERPRINT_SIZE, IDENTITY_POW_THRESHOLD};
use crate::protocol::{ADDRESS_SIZE, ADDRESS_SIZE_STRING, IDENTITY_POW_THRESHOLD};
use crate::vl1::Address;
/// Current maximum size for an identity signature.
pub const MAX_SIGNATURE_SIZE: usize = P384_ECDSA_SIGNATURE_SIZE + 1;
pub const IDENTITY_MAX_SIGNATURE_SIZE: usize = P384_ECDSA_SIGNATURE_SIZE + 1;
/// Size of an identity fingerprint (SHA384)
pub const IDENTITY_FINGERPRINT_SIZE: usize = 48;
/// Secret keys associated with NIST P-384 public keys.
#[derive(Clone)]
@ -358,7 +361,7 @@ impl Identity {
/// set the old 96-byte signature plus hash format used in ZeroTier v1 is used.
///
/// A return of None happens if we don't have our secret key(s) or some other error occurs.
pub fn sign(&self, msg: &[u8], legacy_ed25519_only: bool) -> Option<ArrayVec<u8, MAX_SIGNATURE_SIZE>> {
pub fn sign(&self, msg: &[u8], legacy_ed25519_only: bool) -> Option<ArrayVec<u8, IDENTITY_MAX_SIGNATURE_SIZE>> {
if let Some(secret) = self.secret.as_ref() {
if legacy_ed25519_only {
Some(secret.ed25519.sign_zt(msg).into())

View file

@ -9,7 +9,6 @@ mod path;
mod peer;
mod rootset;
mod symmetricsecret;
mod whoisqueue;
pub(crate) mod node;
@ -22,7 +21,7 @@ pub use event::Event;
pub use identity::Identity;
pub use inetaddress::InetAddress;
pub use mac::MAC;
pub use node::{DummyInnerProtocol, DummyPathFilter, HostSystem, InnerProtocol, Node, NodeStorage, PathFilter};
pub use node::{DummyInnerProtocol, DummyPathFilter, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, PathFilter};
pub use path::Path;
pub use peer::Peer;
pub use rootset::{Root, RootSet};

View file

@ -7,7 +7,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use crate::error::InvalidParameterError;
use crate::protocol::*;
@ -21,33 +21,26 @@ use crate::vl1::identity::Identity;
use crate::vl1::path::{Path, PathServiceResult};
use crate::vl1::peer::Peer;
use crate::vl1::rootset::RootSet;
use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue};
use zerotier_crypto::random;
use zerotier_crypto::verified::Verified;
use zerotier_utils::hex;
use zerotier_utils::ringbuffer::RingBuffer;
/// Trait implemented by external code to handle events and provide an interface to the system or application.
///
/// These methods are basically callbacks that the core calls to request or transmit things. They are called
/// during calls to things like wire_recieve() and do_background_tasks().
#[async_trait]
pub trait HostSystem: Sync + Send + 'static {
/// Type for local system sockets.
type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + 'static;
/// Type for local system interfaces.
type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString;
/// An event occurred.
///
/// This isn't async to avoid all kinds of issues in code that deals with locks. If you need
/// it to be async use a channel or something.
/// A VL1 level event occurred.
fn event(&self, event: Event);
/// A USER_MESSAGE packet was received.
async fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]);
/// Check a local socket for validity.
///
/// This could return false if the socket's interface no longer exists, its port has been
@ -56,8 +49,7 @@ pub trait HostSystem: Sync + Send + 'static {
/// Called to send a packet over the physical network (virtual -> physical).
///
/// This may return false if the send definitely failed. Otherwise it should return true
/// which indicates possible success but with no guarantee (UDP semantics).
/// This sends with UDP-like semantics. It should do whatever best effort it can and return.
///
/// If a local socket is specified the implementation should send from that socket or not
/// at all (returning false). If a local interface is specified the implementation should
@ -66,15 +58,16 @@ pub trait HostSystem: Sync + Send + 'static {
///
/// For endpoint types that support a packet TTL, the implementation may set the TTL
/// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not
/// supported, the default TTL should be used.
async fn wire_send(
/// supported, the default TTL should be used. This parameter is ignored for types that
/// don't support it.
fn wire_send(
&self,
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
data: &[u8],
packet_ttl: u8,
) -> bool;
);
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
/// This needs to be accurate to about 250 milliseconds resolution or better.
@ -86,20 +79,18 @@ pub trait HostSystem: Sync + Send + 'static {
}
/// Trait to be implemented by outside code to provide object storage to VL1
#[async_trait]
pub trait NodeStorage: Sync + Send + 'static {
/// Load this node's identity from the data store.
async fn load_node_identity(&self) -> Option<Identity>;
fn load_node_identity(&self) -> Option<Identity>;
/// Save this node's identity to the data store.
async fn save_node_identity(&self, id: &Identity);
fn save_node_identity(&self, id: &Identity);
}
/// Trait to be implemented to provide path hints and a filter to approve physical paths
#[async_trait]
pub trait PathFilter: Sync + Send + 'static {
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
async fn check_path<HostSystemImpl: HostSystem>(
fn check_path<HostSystemImpl: HostSystem>(
&self,
id: &Identity,
endpoint: &Endpoint,
@ -108,7 +99,7 @@ pub trait PathFilter: Sync + Send + 'static {
) -> bool;
/// Called to look up any statically defined or memorized paths to known nodes.
async fn get_path_hints<HostSystemImpl: HostSystem>(
fn get_path_hints<HostSystemImpl: HostSystem>(
&self,
id: &Identity,
) -> Option<
@ -120,45 +111,56 @@ pub trait PathFilter: Sync + Send + 'static {
>;
}
/// Result of a packet handler.
pub enum PacketHandlerResult {
/// Packet was handled successfully.
Ok,
/// Packet was handled and an error occurred (malformed, authentication failure, etc.)
Error,
/// Packet was not handled by this handler.
NotHandled,
}
/// Interface between VL1 and higher/inner protocol layers.
///
/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
#[async_trait]
pub trait InnerProtocol: Sync + Send + 'static {
/// Handle a packet, returning true if it was handled by the next layer.
///
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
async fn handle_packet<HostSystemImpl: HostSystem>(
fn handle_packet<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
verb: u8,
payload: &PacketBuffer,
) -> bool;
) -> PacketHandlerResult;
/// Handle errors, returning true if the error was recognized.
async fn handle_error<HostSystemImpl: HostSystem>(
fn handle_error<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
in_re_verb: u8,
in_re_message_id: u64,
error_code: u8,
payload: &PacketBuffer,
cursor: &mut usize,
) -> bool;
) -> PacketHandlerResult;
/// Handle an OK, returing true if the OK was recognized.
async fn handle_ok<HostSystemImpl: HostSystem>(
fn handle_ok<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
in_re_verb: u8,
in_re_message_id: u64,
payload: &PacketBuffer,
cursor: &mut usize,
) -> bool;
) -> PacketHandlerResult;
/// Check if this peer should communicate with another at all.
fn should_communicate_with(&self, id: &Identity) -> bool;
@ -167,17 +169,6 @@ pub trait InnerProtocol: Sync + Send + 'static {
/// How often to check the root cluster definitions against the root list and update.
const ROOT_SYNC_INTERVAL_MS: i64 = 1000;
#[derive(Default)]
struct BackgroundTaskIntervals {
root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>,
root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>,
root_spam_hello: IntervalGate<{ ROOT_HELLO_SPAM_INTERVAL }>,
peer_service: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>,
path_service: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>,
whois_service: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>,
}
/// Mutable fields related to roots and root sets.
struct RootInfo<HostSystemImpl: HostSystem> {
/// Root sets to which we are a member.
sets: HashMap<String, Verified<RootSet>>,
@ -196,56 +187,20 @@ struct RootInfo<HostSystemImpl: HostSystem> {
online: bool,
}
/// Key used to look up paths in a hash map
/// This supports copied keys for storing and refs for fast lookup without having to copy anything.
enum PathKey<'a, HostSystemImpl: HostSystem> {
Copied(Endpoint, HostSystemImpl::LocalSocket),
Ref(&'a Endpoint, &'a HostSystemImpl::LocalSocket),
#[derive(Default)]
struct BackgroundTaskIntervals {
root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>,
root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>,
root_spam_hello: IntervalGate<{ ROOT_HELLO_SPAM_INTERVAL }>,
peer_service: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>,
path_service: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>,
whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
}
impl<'a, HostSystemImpl: HostSystem> Hash for PathKey<'a, HostSystemImpl> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Self::Copied(ep, ls) => {
ep.hash(state);
ls.hash(state);
}
Self::Ref(ep, ls) => {
(*ep).hash(state);
(*ls).hash(state);
}
}
}
}
impl<'a, HostSystemImpl: HostSystem> PartialEq for PathKey<'_, HostSystemImpl> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2),
(Self::Copied(ep1, ls1), Self::Ref(ep2, ls2)) => ep1.eq(*ep2) && ls1.eq(*ls2),
(Self::Ref(ep1, ls1), Self::Copied(ep2, ls2)) => (*ep1).eq(ep2) && (*ls1).eq(ls2),
(Self::Ref(ep1, ls1), Self::Ref(ep2, ls2)) => (*ep1).eq(*ep2) && (*ls1).eq(*ls2),
}
}
}
impl<'a, HostSystemImpl: HostSystem> Eq for PathKey<'_, HostSystemImpl> {}
impl<'a, HostSystemImpl: HostSystem> PathKey<'a, HostSystemImpl> {
#[inline(always)]
fn local_socket(&self) -> &HostSystemImpl::LocalSocket {
match self {
Self::Copied(_, ls) => ls,
Self::Ref(_, ls) => *ls,
}
}
fn to_copied(&self) -> PathKey<'static, HostSystemImpl> {
match self {
Self::Copied(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()),
Self::Ref(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()),
}
}
#[derive(Default)]
struct WhoisQueueItem {
waiting_packets: RingBuffer<PooledPacketBuffer, WHOIS_MAX_WAITING_PACKETS>,
retry_count: u16,
}
/// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network.
@ -260,40 +215,40 @@ pub struct Node<HostSystemImpl: HostSystem> {
pub identity: Identity,
/// Interval latches for periodic background tasks.
intervals: parking_lot::Mutex<BackgroundTaskIntervals>,
intervals: Mutex<BackgroundTaskIntervals>,
/// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use.
paths: parking_lot::RwLock<HashMap<PathKey<'static, HostSystemImpl>, Arc<Path<HostSystemImpl>>>>,
paths: RwLock<HashMap<PathKey<'static, 'static, HostSystemImpl>, Arc<Path<HostSystemImpl>>>>,
/// Peers with which we are currently communicating.
peers: parking_lot::RwLock<HashMap<Address, Arc<Peer<HostSystemImpl>>>>,
peers: RwLock<HashMap<Address, Arc<Peer<HostSystemImpl>>>>,
/// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions.
roots: parking_lot::RwLock<RootInfo<HostSystemImpl>>,
roots: RwLock<RootInfo<HostSystemImpl>>,
/// Current best root.
best_root: parking_lot::RwLock<Option<Arc<Peer<HostSystemImpl>>>>,
best_root: RwLock<Option<Arc<Peer<HostSystemImpl>>>>,
/// Identity lookup queue, also holds packets waiting on a lookup.
whois: WhoisQueue,
/// Queue of identities being looked up.
whois_queue: Mutex<HashMap<Address, WhoisQueueItem>>,
}
impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
pub async fn new<NodeStorageImpl: NodeStorage>(
pub fn new<NodeStorageImpl: NodeStorage>(
host_system: &HostSystemImpl,
storage: &NodeStorageImpl,
auto_generate_identity: bool,
auto_upgrade_identity: bool,
) -> Result<Self, InvalidParameterError> {
let mut id = {
let id = storage.load_node_identity().await;
let id = storage.load_node_identity();
if id.is_none() {
if !auto_generate_identity {
return Err(InvalidParameterError("no identity found and auto-generate not enabled"));
} else {
let id = Identity::generate();
host_system.event(Event::IdentityAutoGenerated(id.clone()));
storage.save_node_identity(&id).await;
storage.save_node_identity(&id);
id
}
} else {
@ -304,7 +259,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if auto_upgrade_identity {
let old = id.clone();
if id.upgrade()? {
storage.save_node_identity(&id).await;
storage.save_node_identity(&id);
host_system.event(Event::IdentityAutoUpgraded(old, id.clone()));
}
}
@ -314,18 +269,18 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
Ok(Self {
instance_id: random::get_bytes_secure(),
identity: id,
intervals: parking_lot::Mutex::new(BackgroundTaskIntervals::default()),
paths: parking_lot::RwLock::new(HashMap::new()),
peers: parking_lot::RwLock::new(HashMap::new()),
roots: parking_lot::RwLock::new(RootInfo {
intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: RwLock::new(HashMap::new()),
peers: RwLock::new(HashMap::new()),
roots: RwLock::new(RootInfo {
sets: HashMap::new(),
roots: HashMap::new(),
this_root_sets: None,
sets_modified: false,
online: false,
}),
best_root: parking_lot::RwLock::new(None),
whois: WhoisQueue::new(),
best_root: RwLock::new(None),
whois_queue: Mutex::new(HashMap::new()),
})
}
@ -396,17 +351,20 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
pub async fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration {
let tt = host_system.time_ticks();
let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_service) = {
pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration {
const INTERVAL_MS: i64 = 1000;
const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64);
let time_ticks = host_system.time_ticks();
let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_queue_retry) = {
let mut intervals = self.intervals.lock();
(
intervals.root_sync.gate(tt),
intervals.root_hello.gate(tt),
intervals.root_spam_hello.gate(tt),
intervals.peer_service.gate(tt),
intervals.path_service.gate(tt),
intervals.whois_service.gate(tt),
intervals.root_sync.gate(time_ticks),
intervals.root_hello.gate(time_ticks),
intervals.root_spam_hello.gate(time_ticks),
intervals.peer_service.gate(time_ticks),
intervals.path_service.gate(time_ticks),
intervals.whois_queue_retry.gate(time_ticks),
)
};
@ -443,11 +401,11 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
} else {
""
},
if whois_service {
" whois_service"
if whois_queue_retry {
" whois_queue_retry"
} else {
""
},
}
);
if root_sync {
@ -509,9 +467,9 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if let Some(peer) = peers.get(&m.identity.address) {
new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
} else {
if let Some(peer) = Peer::<HostSystemImpl>::new(&self.identity, m.identity.clone(), tt) {
if let Some(peer) = Peer::<HostSystemImpl>::new(&self.identity, m.identity.clone(), time_ticks) {
new_roots.insert(
parking_lot::RwLockUpgradableReadGuard::upgrade(peers)
RwLockUpgradableReadGuard::upgrade(peers)
.entry(m.identity.address)
.or_insert_with(|| Arc::new(peer))
.clone(),
@ -553,7 +511,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
self.update_best_root(host_system, tt);
self.update_best_root(host_system, time_ticks);
}
// Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint
@ -577,7 +535,9 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
root.identity.address.to_string(),
ROOT_HELLO_INTERVAL
);
root.send_hello(host_system, self, Some(ep)).await;
let root = root.clone();
let ep = ep.clone();
root.send_hello(host_system, self, Some(&ep));
}
}
}
@ -589,7 +549,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
{
let roots = self.roots.read();
for (a, peer) in self.peers.read().iter() {
if !peer.service(host_system, self, tt) && !roots.roots.contains_key(peer) {
if !peer.service(host_system, self, time_ticks) && !roots.roots.contains_key(peer) {
dead_peers.push(*a);
}
}
@ -600,13 +560,13 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
if path_service {
// Service all paths, removing expired or invalid ones. This is done in two passes to
// avoid introducing latency into a flow.
let mut dead_paths = Vec::new();
let mut need_keepalive = Vec::new();
// First check all paths in read mode to avoid blocking the entire node.
for (k, path) in self.paths.read().iter() {
if host_system.local_socket_is_valid(k.local_socket()) {
match path.service(tt) {
match path.service(time_ticks) {
PathServiceResult::Ok => {}
PathServiceResult::Dead => dead_paths.push(k.to_copied()),
PathServiceResult::NeedsKeepalive => need_keepalive.push(path.clone()),
@ -615,26 +575,40 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
dead_paths.push(k.to_copied());
}
}
// Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking.
for dp in dead_paths.iter() {
self.paths.write().remove(dp);
}
let ka = [tt as u8]; // send different bytes every time for keepalive in case some things filter zero packets
// Finally run keepalive sends as a batch.
let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance
for p in need_keepalive.iter() {
host_system
.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &ka[..1], 0)
.await;
host_system.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &keepalive_buf, 0);
}
}
if whois_service {
self.whois.service(host_system, self, tt);
if whois_queue_retry {
let need_whois = {
let mut need_whois = Vec::new();
let mut whois_queue = self.whois_queue.lock();
whois_queue.retain(|_, qi| qi.retry_count <= WHOIS_RETRY_COUNT_MAX);
for (address, qi) in whois_queue.iter_mut() {
qi.retry_count += 1;
need_whois.push(*address);
}
need_whois
};
if !need_whois.is_empty() {
self.send_whois(host_system, need_whois.as_slice());
}
}
debug_event!(host_system, "[vl1] do_background_tasks DONE ----");
Duration::from_millis(1000)
INTERVAL
}
pub async fn handle_incoming_physical_packet<InnerProtocolImpl: InnerProtocol>(
pub fn handle_incoming_physical_packet<InnerProtocolImpl: InnerProtocol>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
@ -656,6 +630,16 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
source_local_interface.to_string()
);
// An 0xff value at byte [8] means this is a ZSSP packet. This is accomplished via the
// backward compatibilty hack of always having 0xff at byte [4] of 6-byte session IDs
// and by having 0xffffffffffff be the "nil" session ID for session init packets. ZSSP
// is the new V2 Noise-based forward-secure transport protocol. What follows below this
// is legacy handling of the old v1 protocol.
if data.u8_at(8).map_or(false, |x| x == 0xff) {
todo!();
}
// Legacy ZeroTier V1 packet handling
if let Ok(fragment_header) = data.struct_mut_at::<v1::FragmentHeader>(0) {
if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) {
let time_ticks = host_system.time_ticks();
@ -668,7 +652,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
let fragment_header_id = u64::from_be_bytes(fragment_header.id);
debug_event!(
host_system,
"[vl1] #{:0>16x} fragment {} of {} received",
"[vl1] [v1] #{:0>16x} fragment {} of {} received",
u64::from_be_bytes(fragment_header.id),
fragment_header.fragment_no(),
fragment_header.total_fragments()
@ -683,7 +667,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
) {
if let Some(frag0) = assembled_packet.frags[0].as_ref() {
#[cfg(debug_assertions)]
debug_event!(host_system, "[vl1] #{:0>16x} packet fully assembled!", fragment_header_id);
debug_event!(host_system, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id);
if let Ok(packet_header) = frag0.struct_at::<v1::PacketHeader>(0) {
if let Some(source) = Address::from_bytes(&packet_header.src) {
@ -697,11 +681,16 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
packet_header,
frag0,
&assembled_packet.frags[1..(assembled_packet.have as usize)],
)
.await;
);
} else {
self.whois
.query(self, host_system, source, Some(QueuedPacket::Fragmented(assembled_packet)));
/*
self.whois_lookup_queue.query(
self,
host_system,
source,
Some(QueuedPacket::Fragmented(assembled_packet)),
);
*/
}
}
}
@ -710,14 +699,17 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
} else {
#[cfg(debug_assertions)]
if let Ok(packet_header) = data.struct_at::<v1::PacketHeader>(0) {
debug_event!(host_system, "[vl1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id));
debug_event!(
host_system,
"[vl1] [v1] #{:0>16x} is unfragmented",
u64::from_be_bytes(packet_header.id)
);
if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) {
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[])
.await;
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]);
} else {
self.whois.query(self, host_system, source, Some(QueuedPacket::Unfragmented(data)));
self.whois(host_system, source, Some(data));
}
}
}
@ -732,14 +724,14 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
debug_packet_id = u64::from_be_bytes(fragment_header.id);
debug_event!(
host_system,
"[vl1] #{:0>16x} forwarding packet fragment to {}",
"[vl1] [v1] #{:0>16x} forwarding packet fragment to {}",
debug_packet_id,
dest.to_string()
);
}
if fragment_header.increment_hops() > v1::FORWARD_MAX_HOPS {
#[cfg(debug_assertions)]
debug_event!(host_system, "[vl1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
return;
}
} else {
@ -749,7 +741,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
debug_packet_id = u64::from_be_bytes(packet_header.id);
debug_event!(
host_system,
"[vl1] #{:0>16x} forwarding packet to {}",
"[vl1] [v1] #{:0>16x} forwarding packet to {}",
debug_packet_id,
dest.to_string()
);
@ -758,7 +750,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
#[cfg(debug_assertions)]
debug_event!(
host_system,
"[vl1] #{:0>16x} discarded: max hops exceeded!",
"[vl1] [v1] #{:0>16x} discarded: max hops exceeded!",
u64::from_be_bytes(packet_header.id)
);
return;
@ -770,15 +762,35 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if let Some(peer) = self.peer(dest) {
// TODO: SHOULD we forward? Need a way to check.
peer.forward(host_system, time_ticks, data.as_ref()).await;
peer.forward(host_system, time_ticks, data.as_ref());
#[cfg(debug_assertions)]
debug_event!(host_system, "[vl1] #{:0>16x} forwarded successfully", debug_packet_id);
debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id);
}
}
}
}
}
fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option<PooledPacketBuffer>) {
{
let mut whois_queue = self.whois_queue.lock();
let qi = whois_queue.entry(address).or_default();
if let Some(p) = waiting_packet {
qi.waiting_packets.add(p);
}
if qi.retry_count > 0 {
return;
} else {
qi.retry_count += 1;
}
}
self.send_whois(host_system, &[address]);
}
fn send_whois(&self, host_system: &HostSystemImpl, addresses: &[Address]) {
if let Some(root) = self.best_root() {}
}
/// Get the current "best" root from among this node's trusted roots.
pub fn best_root(&self) -> Option<Arc<Peer<HostSystemImpl>>> {
self.best_root.read().clone()
@ -865,47 +877,103 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
/// Key used to look up paths in a hash map
/// This supports copied keys for storing and refs for fast lookup without having to copy anything.
enum PathKey<'a, 'b, HostSystemImpl: HostSystem> {
Copied(Endpoint, HostSystemImpl::LocalSocket),
Ref(&'a Endpoint, &'b HostSystemImpl::LocalSocket),
}
impl<'a, 'b, HostSystemImpl: HostSystem> Hash for PathKey<'a, 'b, HostSystemImpl> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Self::Copied(ep, ls) => {
ep.hash(state);
ls.hash(state);
}
Self::Ref(ep, ls) => {
(*ep).hash(state);
(*ls).hash(state);
}
}
}
}
impl<HostSystemImpl: HostSystem> PartialEq for PathKey<'_, '_, HostSystemImpl> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2),
(Self::Copied(ep1, ls1), Self::Ref(ep2, ls2)) => ep1.eq(*ep2) && ls1.eq(*ls2),
(Self::Ref(ep1, ls1), Self::Copied(ep2, ls2)) => (*ep1).eq(ep2) && (*ls1).eq(ls2),
(Self::Ref(ep1, ls1), Self::Ref(ep2, ls2)) => (*ep1).eq(*ep2) && (*ls1).eq(*ls2),
}
}
}
impl<HostSystemImpl: HostSystem> Eq for PathKey<'_, '_, HostSystemImpl> {}
impl<'a, 'b, HostSystemImpl: HostSystem> PathKey<'a, 'b, HostSystemImpl> {
#[inline(always)]
fn local_socket(&self) -> &HostSystemImpl::LocalSocket {
match self {
Self::Copied(_, ls) => ls,
Self::Ref(_, ls) => *ls,
}
}
#[inline(always)]
fn to_copied(&self) -> PathKey<'static, 'static, HostSystemImpl> {
match self {
Self::Copied(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()),
Self::Ref(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()),
}
}
}
/// Dummy no-op inner protocol for debugging and testing.
#[derive(Default)]
pub struct DummyInnerProtocol;
#[async_trait]
impl InnerProtocol for DummyInnerProtocol {
async fn handle_packet<HostSystemImpl: HostSystem>(
#[inline(always)]
fn handle_packet<HostSystemImpl: HostSystem>(
&self,
_source: &Peer<HostSystemImpl>,
_source_path: &Path<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_verb: u8,
_payload: &PacketBuffer,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
async fn handle_error<HostSystemImpl: HostSystem>(
#[inline(always)]
fn handle_error<HostSystemImpl: HostSystem>(
&self,
_source: &Peer<HostSystemImpl>,
_source_path: &Path<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_in_re_verb: u8,
_in_re_message_id: u64,
_error_code: u8,
_payload: &PacketBuffer,
_cursor: &mut usize,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
async fn handle_ok<HostSystemImpl: HostSystem>(
#[inline(always)]
fn handle_ok<HostSystemImpl: HostSystem>(
&self,
_source: &Peer<HostSystemImpl>,
_source_path: &Path<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_in_re_verb: u8,
_in_re_message_id: u64,
_payload: &PacketBuffer,
_cursor: &mut usize,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
#[inline(always)]
fn should_communicate_with(&self, _id: &Identity) -> bool {
true
}
@ -915,9 +983,9 @@ impl InnerProtocol for DummyInnerProtocol {
#[derive(Default)]
pub struct DummyPathFilter;
#[async_trait]
impl PathFilter for DummyPathFilter {
async fn check_path<HostSystemImpl: HostSystem>(
#[inline(always)]
fn check_path<HostSystemImpl: HostSystem>(
&self,
_id: &Identity,
_endpoint: &Endpoint,
@ -927,7 +995,8 @@ impl PathFilter for DummyPathFilter {
true
}
async fn get_path_hints<HostSystemImpl: HostSystem>(
#[inline(always)]
fn get_path_hints<HostSystemImpl: HostSystem>(
&self,
_id: &Identity,
) -> Option<

View file

@ -32,7 +32,7 @@ pub struct Path<HostSystemImpl: HostSystem> {
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<PacketId, FragmentedPacket, PacketIdHasher>>,
fragmented_packets: Mutex<HashMap<u64, FragmentedPacket, PacketIdHasher>>,
}
impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
@ -57,7 +57,7 @@ impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
/// This returns None if more fragments are needed to assemble the packet.
pub(crate) fn receive_fragment(
&self,
packet_id: PacketId,
packet_id: u64,
fragment_no: u8,
fragment_expecting_count: u8,
packet: PooledPacketBuffer,

View file

@ -206,7 +206,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
(time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME
}
async fn internal_send(
fn internal_send(
&self,
host_system: &HostSystemImpl,
endpoint: &Endpoint,
@ -214,16 +214,11 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
local_interface: Option<&HostSystemImpl::LocalInterface>,
max_fragment_size: usize,
packet: &PacketBuffer,
) -> bool {
) {
let packet_size = packet.len();
if packet_size > max_fragment_size {
let bytes = packet.as_bytes();
if !host_system
.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0)
.await
{
return false;
}
host_system.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0);
let mut pos = UDP_DEFAULT_MTU;
let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32;
@ -247,23 +242,16 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
let fragment_size = v1::FRAGMENT_HEADER_SIZE + chunk_size;
tmp_buf[..v1::FRAGMENT_HEADER_SIZE].copy_from_slice(header.as_bytes());
tmp_buf[v1::FRAGMENT_HEADER_SIZE..fragment_size].copy_from_slice(&bytes[pos..next_pos]);
if !host_system
.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0)
.await
{
return false;
}
host_system.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0);
pos = next_pos;
if pos < packet_size {
chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE);
} else {
return true;
break;
}
}
} else {
return host_system
.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0)
.await;
host_system.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0);
}
}
@ -273,7 +261,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
/// via a root or some other route.
///
/// It encrypts and sets the MAC and cipher fields and packet ID and other things.
pub(crate) async fn send(
pub(crate) fn send(
&self,
host_system: &HostSystemImpl,
path: Option<&Arc<Path<HostSystemImpl>>>,
@ -326,22 +314,18 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
return false;
}
if self
.internal_send(
host_system,
&path.endpoint,
Some(&path.local_socket),
Some(&path.local_interface),
max_fragment_size,
packet,
)
.await
{
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
self.internal_send(
host_system,
&path.endpoint,
Some(&path.local_socket),
Some(&path.local_interface),
max_fragment_size,
packet,
);
return false;
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
/// Forward a packet to this peer.
@ -351,21 +335,17 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
///
/// This doesn't fragment large packets since fragments are forwarded individually.
/// Intermediates don't need to adjust fragmentation.
pub(crate) async fn forward(&self, host_system: &HostSystemImpl, time_ticks: i64, packet: &PacketBuffer) -> bool {
pub(crate) fn forward(&self, host_system: &HostSystemImpl, time_ticks: i64, packet: &PacketBuffer) -> bool {
if let Some(path) = self.direct_path() {
if host_system
.wire_send(
&path.endpoint,
Some(&path.local_socket),
Some(&path.local_interface),
packet.as_bytes(),
0,
)
.await
{
self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
host_system.wire_send(
&path.endpoint,
Some(&path.local_socket),
Some(&path.local_interface),
packet.as_bytes(),
0,
);
self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
return false;
}
@ -378,7 +358,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
/// Unlike other messages HELLO is sent partially in the clear and always with the long-lived
/// static identity key. Authentication in old versions is via Poly1305 and in new versions
/// via HMAC-SHA512.
pub(crate) async fn send_hello(
pub(crate) fn send_hello(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
@ -445,26 +425,20 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
}
if let Some(p) = path.as_ref() {
if self
.internal_send(
host_system,
destination,
Some(&p.local_socket),
Some(&p.local_interface),
max_fragment_size,
&packet,
)
.await
{
p.log_send_anything(time_ticks);
true
} else {
false
}
self.internal_send(
host_system,
destination,
Some(&p.local_socket),
Some(&p.local_interface),
max_fragment_size,
&packet,
);
p.log_send_anything(time_ticks);
} else {
self.internal_send(host_system, destination, None, None, max_fragment_size, &packet)
.await
self.internal_send(host_system, destination, None, None, max_fragment_size, &packet);
}
return true;
}
/// Receive, decrypt, authenticate, and process an incoming packet from this peer.
@ -473,8 +447,8 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
/// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) async fn receive<InnerProtocolImpl: InnerProtocol>(
&self,
pub(crate) fn receive<InnerProtocolImpl: InnerProtocol>(
self: &Arc<Self>,
node: &Node<HostSystemImpl>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
@ -483,7 +457,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
packet_header: &v1::PacketHeader,
frag0: &PacketBuffer,
fragments: &[Option<PooledPacketBuffer>],
) -> bool {
) -> PacketHandlerResult {
if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(v1::VERB_INDEX) {
let mut payload = PacketBuffer::new();
@ -503,7 +477,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
"[vl1] #{:0>16x} failed authentication",
u64::from_be_bytes(packet_header.id)
);
return false;
return PacketHandlerResult::Error;
};
if let Ok(mut verb) = payload.u8_at(0) {
@ -513,7 +487,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) {
payload.set_to(&decompressed_payload[..(dlen + 1)]);
} else {
return false;
return PacketHandlerResult::Error;
}
}
@ -541,68 +515,47 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
verb as u32
);
if match verb {
verbs::VL1_NOP => true,
verbs::VL1_HELLO => {
self.handle_incoming_hello(
host_system,
inner,
node,
time_ticks,
message_id,
source_path,
packet_header.hops(),
&payload,
)
.await
}
verbs::VL1_ERROR => {
self.handle_incoming_error(host_system, inner, node, time_ticks, source_path, &payload)
.await
}
verbs::VL1_OK => {
self.handle_incoming_ok(
host_system,
inner,
node,
time_ticks,
source_path,
packet_header.hops(),
path_is_known,
&payload,
)
.await
}
verbs::VL1_WHOIS => {
self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload)
.await
}
return match verb {
verbs::VL1_NOP => PacketHandlerResult::Ok,
verbs::VL1_HELLO => self.handle_incoming_hello(
host_system,
inner,
node,
time_ticks,
message_id,
source_path,
packet_header.hops(),
&payload,
),
verbs::VL1_ERROR => self.handle_incoming_error(host_system, inner, node, time_ticks, source_path, &payload),
verbs::VL1_OK => self.handle_incoming_ok(
host_system,
inner,
node,
time_ticks,
source_path,
packet_header.hops(),
path_is_known,
&payload,
),
verbs::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload),
verbs::VL1_RENDEZVOUS => {
self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload)
.await
}
verbs::VL1_ECHO => {
self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload)
.await
}
verbs::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload),
verbs::VL1_PUSH_DIRECT_PATHS => {
self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload)
.await
}
verbs::VL1_USER_MESSAGE => {
self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload)
.await
}
_ => inner.handle_packet(self, &source_path, verb, &payload).await,
} {
return true;
}
verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload),
_ => inner.handle_packet(self, &source_path, verb, &payload),
};
}
}
return false;
return PacketHandlerResult::Error;
}
async fn handle_incoming_hello<InnerProtocolImpl: InnerProtocol>(
fn handle_incoming_hello<InnerProtocolImpl: InnerProtocol>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
@ -612,14 +565,14 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
source_path: &Arc<Path<HostSystemImpl>>,
hops: u8,
payload: &PacketBuffer,
) -> bool {
) -> PacketHandlerResult {
if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
debug_event!(
host_system,
"[vl1] dropping HELLO from {} due to lack of trust relationship",
self.identity.address.to_string()
);
return true; // packet wasn't invalid, just ignored
return PacketHandlerResult::Ok; // packet wasn't invalid, just ignored
}
let mut cursor = 0;
@ -653,48 +606,48 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
f.1.version_revision = VERSION_REVISION.to_be_bytes();
}
return self.send(host_system, Some(source_path), node, time_ticks, &mut packet).await;
self.send(host_system, Some(source_path), node, time_ticks, &mut packet);
return PacketHandlerResult::Ok;
}
}
}
return false;
return PacketHandlerResult::Error;
}
async fn handle_incoming_error<InnerProtocolImpl: InnerProtocol>(
&self,
fn handle_incoming_error<InnerProtocolImpl: InnerProtocol>(
self: &Arc<Self>,
_: &HostSystemImpl,
inner: &InnerProtocolImpl,
_: &Node<HostSystemImpl>,
_: i64,
source_path: &Arc<Path<HostSystemImpl>>,
payload: &PacketBuffer,
) -> bool {
) -> PacketHandlerResult {
let mut cursor = 0;
if let Ok(error_header) = payload.read_struct::<v1::message_component_structs::ErrorHeader>(&mut cursor) {
let in_re_message_id: MessageId = u64::from_ne_bytes(error_header.in_re_message_id);
if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX {
match error_header.in_re_verb {
_ => {
return inner
.handle_error(
self,
&source_path,
error_header.in_re_verb,
in_re_message_id,
error_header.error_code,
payload,
&mut cursor,
)
.await;
return inner.handle_error(
self,
&source_path,
error_header.in_re_verb,
in_re_message_id,
error_header.error_code,
payload,
&mut cursor,
);
}
}
}
}
return false;
return PacketHandlerResult::Error;
}
async fn handle_incoming_ok<InnerProtocolImpl: InnerProtocol>(
&self,
fn handle_incoming_ok<InnerProtocolImpl: InnerProtocol>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
@ -703,7 +656,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
hops: u8,
path_is_known: bool,
payload: &PacketBuffer,
) -> bool {
) -> PacketHandlerResult {
let mut cursor = 0;
if let Ok(ok_header) = payload.read_struct::<v1::message_component_structs::OkHeader>(&mut cursor) {
let in_re_message_id: MessageId = u64::from_ne_bytes(ok_header.in_re_message_id);
@ -753,30 +706,28 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
}
}
} else {
return true; // not invalid, just ignored
return PacketHandlerResult::Ok; // not invalid, just ignored
}
}
_ => {
return inner
.handle_ok(self, &source_path, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor)
.await;
return inner.handle_ok(self, &source_path, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor);
}
}
}
}
return false;
return PacketHandlerResult::Error;
}
async fn handle_incoming_whois<InnerProtocolImpl: InnerProtocol>(
&self,
fn handle_incoming_whois<InnerProtocolImpl: InnerProtocol>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
time_ticks: i64,
message_id: MessageId,
payload: &PacketBuffer,
) -> bool {
) -> PacketHandlerResult {
if node.this_node_is_root() || inner.should_communicate_with(&self.identity) {
let mut packet = PacketBuffer::new();
packet.set_size(v1::HEADER_SIZE);
@ -793,33 +744,31 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
if let Some(peer) = node.peer(zt_address) {
if !packet.append_bytes((&peer.identity.to_public_bytes()).into()).is_ok() {
debug_event!(host_system, "unexpected error serializing an identity into a WHOIS packet response");
return false;
return PacketHandlerResult::Error;
}
}
}
}
self.send(host_system, None, node, time_ticks, &mut packet).await
} else {
true // packet wasn't invalid, just ignored
self.send(host_system, None, node, time_ticks, &mut packet);
}
return PacketHandlerResult::Ok;
}
#[allow(unused)]
async fn handle_incoming_rendezvous(
&self,
fn handle_incoming_rendezvous(
self: &Arc<Self>,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
time_ticks: i64,
message_id: MessageId,
source_path: &Arc<Path<HostSystemImpl>>,
payload: &PacketBuffer,
) -> bool {
) -> PacketHandlerResult {
if node.is_peer_root(self) {}
return true;
return PacketHandlerResult::Ok;
}
async fn handle_incoming_echo<InnerProtocolImpl: InnerProtocol>(
fn handle_incoming_echo<InnerProtocolImpl: InnerProtocol>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
@ -827,7 +776,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
time_ticks: i64,
message_id: MessageId,
payload: &PacketBuffer,
) -> bool {
) -> PacketHandlerResult {
if inner.should_communicate_with(&self.identity) || node.is_peer_root(self) {
let mut packet = PacketBuffer::new();
packet.set_size(v1::HEADER_SIZE);
@ -838,9 +787,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
f.in_re_message_id = message_id.to_ne_bytes();
}
if packet.append_bytes(payload.as_bytes()).is_ok() {
self.send(host_system, None, node, time_ticks, &mut packet).await
} else {
false
self.send(host_system, None, node, time_ticks, &mut packet);
}
} else {
debug_event!(
@ -848,32 +795,30 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
"[vl1] dropping ECHO from {} due to lack of trust relationship",
self.identity.address.to_string()
);
true // packet wasn't invalid, just ignored
}
return PacketHandlerResult::Ok;
}
#[allow(unused)]
async fn handle_incoming_push_direct_paths(
&self,
fn handle_incoming_push_direct_paths(
self: &Arc<Self>,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
time_ticks: i64,
source_path: &Arc<Path<HostSystemImpl>>,
payload: &PacketBuffer,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
#[allow(unused)]
async fn handle_incoming_user_message(
&self,
fn handle_incoming_user_message(
self: &Arc<Self>,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
time_ticks: i64,
source_path: &Arc<Path<HostSystemImpl>>,
payload: &PacketBuffer,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
}

View file

@ -4,7 +4,7 @@ use std::collections::BTreeSet;
use std::io::Write;
use crate::util::marshalable::*;
use crate::vl1::identity::{Identity, MAX_SIGNATURE_SIZE};
use crate::vl1::identity::{Identity, IDENTITY_MAX_SIGNATURE_SIZE};
use crate::vl1::Endpoint;
use zerotier_utils::arrayvec::ArrayVec;
@ -33,7 +33,7 @@ pub struct Root {
/// This is populated by the sign() method when the completed root set is signed by each member.
/// All member roots must sign.
#[serde(default)]
pub signature: ArrayVec<u8, MAX_SIGNATURE_SIZE>,
pub signature: ArrayVec<u8, IDENTITY_MAX_SIGNATURE_SIZE>,
/// Priority (higher number is lower priority, 0 is default).
///

View file

@ -1,84 +0,0 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::{HashMap, LinkedList};
use parking_lot::Mutex;
use crate::protocol::{PooledPacketBuffer, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_INTERVAL, WHOIS_RETRY_MAX};
use crate::util::gate::IntervalGate;
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::{HostSystem, Node};
use crate::vl1::Address;
pub(crate) const SERVICE_INTERVAL_MS: i64 = WHOIS_RETRY_INTERVAL;
pub(crate) enum QueuedPacket {
Unfragmented(PooledPacketBuffer),
Fragmented(FragmentedPacket),
}
struct WhoisQueueItem {
packet_queue: LinkedList<QueuedPacket>,
retry_gate: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
retry_count: u16,
}
pub(crate) struct WhoisQueue(Mutex<HashMap<Address, WhoisQueueItem>>);
impl WhoisQueue {
pub fn new() -> Self {
Self(Mutex::new(HashMap::new()))
}
/// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received.
pub fn query<SI: HostSystem>(&self, node: &Node<SI>, si: &SI, target: Address, packet: Option<QueuedPacket>) {
let mut q = self.0.lock();
let qi = q.entry(target).or_insert_with(|| WhoisQueueItem {
packet_queue: LinkedList::new(),
retry_gate: IntervalGate::new(0),
retry_count: 0,
});
if qi.retry_gate.gate(si.time_ticks()) {
qi.retry_count += 1;
if packet.is_some() {
while qi.packet_queue.len() >= WHOIS_MAX_WAITING_PACKETS {
let _ = qi.packet_queue.pop_front();
}
qi.packet_queue.push_back(packet.unwrap());
}
self.send_whois(node, si, &[target]);
}
}
/// Remove a WHOIS request from the queue and call the supplied function for all queued packets.
#[allow(unused)]
pub fn response_received_get_packets<F: FnMut(&mut QueuedPacket)>(&self, address: Address, packet_handler: F) {
let mut qi = self.0.lock().remove(&address);
let _ = qi.map(|mut qi| qi.packet_queue.iter_mut().for_each(packet_handler));
}
#[allow(unused)]
fn send_whois<SI: HostSystem>(&self, node: &Node<SI>, si: &SI, targets: &[Address]) {
todo!()
}
pub(crate) fn service<SI: HostSystem>(&self, si: &SI, node: &Node<SI>, time_ticks: i64) {
let mut targets: Vec<Address> = Vec::new();
self.0.lock().retain(|target, qi| {
if qi.retry_count < WHOIS_RETRY_MAX {
if qi.retry_gate.gate(time_ticks) {
qi.retry_count += 1;
targets.push(*target);
}
true
} else {
false
}
});
if !targets.is_empty() {
self.send_whois(node, si, targets.as_slice());
}
}
}

View file

@ -14,5 +14,5 @@ pub struct CertificateOfMembership {
pub network_id: NetworkId,
pub timestamp: i64,
pub max_delta: i64,
pub signature: ArrayVec<u8, { identity::MAX_SIGNATURE_SIZE }>,
pub signature: ArrayVec<u8, { identity::IDENTITY_MAX_SIGNATURE_SIZE }>,
}

View file

@ -1,56 +1,51 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use async_trait::async_trait;
use std::sync::Arc;
use crate::protocol::PacketBuffer;
use crate::vl1::node::{HostSystem, InnerProtocol};
use crate::vl1::node::{HostSystem, InnerProtocol, PacketHandlerResult};
use crate::vl1::{Identity, Path, Peer};
pub trait SwitchInterface: Sync + Send {}
pub struct Switch {}
#[async_trait]
impl InnerProtocol for Switch {
#[allow(unused)]
async fn handle_packet<HostSystemImpl: HostSystem>(
fn handle_packet<HostSystemImpl: HostSystem>(
&self,
peer: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
verb: u8,
payload: &PacketBuffer,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
#[allow(unused)]
async fn handle_error<HostSystemImpl: HostSystem>(
fn handle_error<HostSystemImpl: HostSystem>(
&self,
peer: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
in_re_verb: u8,
in_re_message_id: u64,
error_code: u8,
payload: &PacketBuffer,
cursor: &mut usize,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
#[allow(unused)]
async fn handle_ok<HostSystemImpl: HostSystem>(
fn handle_ok<HostSystemImpl: HostSystem>(
&self,
peer: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
in_re_verb: u8,
in_re_message_id: u64,
payload: &PacketBuffer,
cursor: &mut usize,
) -> bool {
false
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
#[allow(unused)]
fn should_communicate_with(&self, id: &Identity) -> bool {
true
}

View file

@ -14,5 +14,5 @@ pub struct Tag {
pub timestamp: i64,
pub issued_to: Address,
pub signed_by: Address,
pub signature: ArrayVec<u8, { identity::MAX_SIGNATURE_SIZE }>,
pub signature: ArrayVec<u8, { identity::IDENTITY_MAX_SIGNATURE_SIZE }>,
}

View file

@ -73,7 +73,7 @@ pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 {
return exitcode::ERR_IOERR;
}
let root_set = root_set.unwrap();
if root_set.verify() {
if root_set.verify().is_some() {
println!("OK");
} else {
println!("FAILED");
@ -108,7 +108,9 @@ pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 {
}
Some(("restoredefault", _)) => {
let _ = std::io::stdout().write_all(to_json_pretty(&RootSet::zerotier_default()).as_bytes());
let rs = RootSet::zerotier_default();
let _ = std::io::stdout().write_all(to_json_pretty(&*rs).as_bytes());
// TODO: re-add
}
_ => panic!(),

View file

@ -7,12 +7,10 @@ use std::sync::Arc;
use crate::localconfig::Config;
use crate::utils::{read_limit, DEFAULT_FILE_IO_READ_LIMIT};
use async_trait::async_trait;
use parking_lot::{Mutex, RwLock};
use zerotier_crypto::random::next_u32_secure;
use zerotier_network_hypervisor::vl1::{Identity, Storage};
use zerotier_network_hypervisor::vl1::{Identity, NodeStorage};
use zerotier_utils::json::to_json_pretty;
const AUTH_TOKEN_DEFAULT_LENGTH: usize = 48;
@ -29,29 +27,35 @@ pub struct DataDir {
authtoken: Mutex<String>,
}
#[async_trait]
impl Storage for DataDir {
async fn load_node_identity(&self) -> Option<Identity> {
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096).await;
if id_data.is_err() {
return None;
}
let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref());
if id_data.is_err() {
return None;
}
Some(id_data.unwrap())
impl NodeStorage for DataDir {
fn load_node_identity(&self) -> Option<Identity> {
todo!()
/*
tokio::runtime::Handle::current().spawn(async {
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096).await;
if id_data.is_err() {
return None;
}
let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref());
if id_data.is_err() {
return None;
}
Some(id_data.unwrap())
})
*/
}
async fn save_node_identity(&self, id: &Identity) {
assert!(id.secret.is_some());
let id_secret_str = id.to_secret_string();
let id_public_str = id.to_string();
let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME);
// TODO: handle errors
let _ = tokio::fs::write(&secret_path, id_secret_str.as_bytes()).await;
assert!(crate::utils::fs_restrict_permissions(&secret_path));
let _ = tokio::fs::write(self.base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes()).await;
fn save_node_identity(&self, id: &Identity) {
tokio::runtime::Handle::current().block_on(async {
assert!(id.secret.is_some());
let id_secret_str = id.to_secret_string();
let id_public_str = id.to_string();
let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME);
// TODO: handle errors
let _ = tokio::fs::write(&secret_path, id_secret_str.as_bytes()).await;
assert!(crate::utils::fs_restrict_permissions(&secret_path));
let _ = tokio::fs::write(self.base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes()).await;
})
}
}

View file

@ -9,3 +9,4 @@ version = "0.1.0"
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }

View file

@ -142,8 +142,8 @@ impl<T, const C: usize> ArrayVec<T, C> {
#[inline(always)]
pub fn push(&mut self, v: T) {
if self.s < C {
let i = self.s;
let i = self.s;
if i < C {
unsafe { self.a.get_unchecked_mut(i).write(v) };
self.s = i + 1;
} else {

View file

@ -10,6 +10,8 @@ pub mod hex;
pub mod json;
pub mod memory;
pub mod pool;
pub mod reaper;
pub mod ringbuffer;
pub mod ringbuffermap;
pub mod varint;

49
utils/src/reaper.rs Normal file
View file

@ -0,0 +1,49 @@
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::Instant;
/// Watches tokio jobs and times them out if they run past a deadline or aborts them all if the reaper is dropped.
pub struct Reaper {
q: Arc<(parking_lot::Mutex<VecDeque<(JoinHandle<()>, Instant)>>, Notify)>,
finisher: JoinHandle<()>,
}
impl Reaper {
pub fn new() -> Self {
let q = Arc::new((parking_lot::Mutex::new(VecDeque::with_capacity(16)), Notify::new()));
Self {
q: q.clone(),
finisher: tokio::spawn(async move {
loop {
q.1.notified().await;
loop {
let j = q.0.lock().pop_front();
if let Some(j) = j {
let _ = tokio::time::timeout_at(j.1, j.0).await;
} else {
break;
}
}
}
}),
}
}
/// Add a job to be executed with timeout at a given instant.
#[inline]
pub fn add(&self, job: JoinHandle<()>, deadline: Instant) {
self.q.0.lock().push_back((job, deadline));
self.q.1.notify_waiters();
}
}
impl Drop for Reaper {
#[inline]
fn drop(&mut self) {
self.finisher.abort();
self.q.0.lock().drain(..).for_each(|j| j.0.abort());
}
}

113
utils/src/ringbuffer.rs Normal file
View file

@ -0,0 +1,113 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::mem::MaybeUninit;
/// A FIFO ring buffer.
pub struct RingBuffer<T, const C: usize> {
a: [MaybeUninit<T>; C],
p: usize,
}
impl<T, const C: usize> RingBuffer<T, C> {
#[inline]
pub fn new() -> Self {
let mut tmp: Self = unsafe { MaybeUninit::uninit().assume_init() };
tmp.p = 0;
tmp
}
/// Add an element to the buffer, replacing old elements if full.
#[inline]
pub fn add(&mut self, o: T) {
let p = self.p;
if p < C {
unsafe { self.a.get_unchecked_mut(p).write(o) };
} else {
unsafe { *self.a.get_unchecked_mut(p % C).assume_init_mut() = o };
}
self.p = p.wrapping_add(1);
}
/// Clear the buffer and drop all elements.
#[inline]
pub fn clear(&mut self) {
for i in 0..C.min(self.p) {
unsafe { self.a.get_unchecked_mut(i).assume_init_drop() };
}
self.p = 0;
}
/// Gets an iterator that dumps the contents of the buffer in FIFO order.
#[inline]
pub fn iter(&self) -> RingBufferIterator<'_, T, C> {
let s = C.min(self.p);
RingBufferIterator { b: self, s, i: self.p.wrapping_sub(s) }
}
}
impl<T, const C: usize> Default for RingBuffer<T, C> {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl<T, const C: usize> Drop for RingBuffer<T, C> {
#[inline(always)]
fn drop(&mut self) {
self.clear();
}
}
pub struct RingBufferIterator<'a, T, const C: usize> {
b: &'a RingBuffer<T, C>,
s: usize,
i: usize,
}
impl<'a, T, const C: usize> Iterator for RingBufferIterator<'a, T, C> {
type Item = &'a T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let s = self.s;
if s > 0 {
let i = self.i;
self.s = s.wrapping_sub(1);
self.i = i.wrapping_add(1);
Some(unsafe { self.b.a.get_unchecked(i % C).assume_init_ref() })
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fifo() {
let mut tmp: RingBuffer<i32, 8> = RingBuffer::new();
let mut tmp2 = Vec::new();
for i in 0..4 {
tmp.add(i);
tmp2.push(i);
}
for (i, j) in tmp.iter().zip(tmp2.iter()) {
assert_eq!(*i, *j);
}
tmp.clear();
tmp2.clear();
for i in 0..23 {
tmp.add(i);
tmp2.push(i);
}
while tmp2.len() > 8 {
tmp2.remove(0);
}
for (i, j) in tmp.iter().zip(tmp2.iter()) {
assert_eq!(*i, *j);
}
}
}

View file

@ -9,11 +9,11 @@ license = "MPL-2.0"
zerotier-network-hypervisor = { path = "../network-hypervisor" }
zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
async-trait = "^0"
num-traits = "^0"
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
[target."cfg(windows)".dependencies]
winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::Arc;
use async_trait::async_trait;
use zerotier_crypto::random;
use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InetAddress, InnerProtocol, Node, NodeStorage, PathFilter};
use zerotier_network_hypervisor::protocol::{PacketBufferFactory, PacketBufferPool};
use zerotier_network_hypervisor::vl1::*;
use zerotier_utils::{ms_monotonic, ms_since_epoch};
use crate::constants::UNASSIGNED_PRIVILEGED_PORTS;
@ -24,11 +23,16 @@ use tokio::time::Duration;
/// talks to the physical network, manages the vl1 node, and presents a templated interface for
/// whatever inner protocol implementation is using it. This would typically be VL2 but could be
/// a test harness or just the controller for a controller that runs stand-alone.
pub struct VL1Service<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl: InnerProtocol> {
state: tokio::sync::RwLock<VL1ServiceMutableState>,
pub struct VL1Service<
NodeStorageImpl: NodeStorage + 'static,
PathFilterImpl: PathFilter + 'static,
InnerProtocolImpl: InnerProtocol + 'static,
> {
state: parking_lot::RwLock<VL1ServiceMutableState>,
storage: Arc<NodeStorageImpl>,
inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
buffer_pool: PacketBufferPool,
node_container: Option<Node<Self>>,
}
@ -38,7 +42,7 @@ struct VL1ServiceMutableState {
settings: VL1Settings,
}
impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl: InnerProtocol>
impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'static, InnerProtocolImpl: InnerProtocol + 'static>
VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl>
{
pub async fn new(
@ -48,7 +52,7 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
settings: VL1Settings,
) -> Result<Arc<Self>, Box<dyn Error>> {
let mut service = VL1Service {
state: tokio::sync::RwLock::new(VL1ServiceMutableState {
state: parking_lot::RwLock::new(VL1ServiceMutableState {
daemons: Vec::with_capacity(2),
udp_sockets: HashMap::with_capacity(8),
settings,
@ -56,17 +60,19 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
storage,
inner,
path_filter,
buffer_pool: PacketBufferPool::new(
std::thread::available_parallelism().map_or(2, |c| c.get() + 2),
PacketBufferFactory::new(),
),
node_container: None,
};
service
.node_container
.replace(Node::new(&service, &*service.storage, true, false).await?);
service.node_container.replace(Node::new(&service, &*service.storage, true, false)?);
let service = Arc::new(service);
let mut state = service.state.write().await;
state.daemons.push(tokio::spawn(service.clone().udp_bind_daemon()));
state.daemons.push(tokio::spawn(service.clone().node_background_task_daemon()));
drop(state);
let mut daemons = Vec::new();
daemons.push(tokio::spawn(service.clone().udp_bind_daemon()));
daemons.push(tokio::spawn(service.clone().node_background_task_daemon()));
service.state.write().daemons = daemons;
Ok(service)
}
@ -77,116 +83,115 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
unsafe { self.node_container.as_ref().unwrap_unchecked() }
}
pub async fn bound_udp_ports(&self) -> Vec<u16> {
self.state.read().await.udp_sockets.keys().cloned().collect()
pub fn bound_udp_ports(&self) -> Vec<u16> {
self.state.read().udp_sockets.keys().cloned().collect()
}
async fn udp_bind_daemon(self: Arc<Self>) {
loop {
let state = self.state.read().await;
let mut need_fixed_ports: HashSet<u16> = HashSet::from_iter(state.settings.fixed_ports.iter().cloned());
let mut have_random_port_count = 0;
for (p, _) in state.udp_sockets.iter() {
need_fixed_ports.remove(p);
have_random_port_count += (!state.settings.fixed_ports.contains(p)) as usize;
}
let desired_random_port_count = state.settings.random_port_count;
let state = if !need_fixed_ports.is_empty() || have_random_port_count != desired_random_port_count {
drop(state);
let mut state = self.state.write().await;
for p in need_fixed_ports.iter() {
state.udp_sockets.insert(*p, parking_lot::RwLock::new(BoundUdpPort::new(*p)));
{
let state = self.state.read();
let mut need_fixed_ports: HashSet<u16> = HashSet::from_iter(state.settings.fixed_ports.iter().cloned());
let mut have_random_port_count = 0;
for (p, _) in state.udp_sockets.iter() {
need_fixed_ports.remove(p);
have_random_port_count += (!state.settings.fixed_ports.contains(p)) as usize;
}
let desired_random_port_count = state.settings.random_port_count;
while have_random_port_count > desired_random_port_count {
let mut most_stale_binding_liveness = (usize::MAX, i64::MAX);
let mut most_stale_binding_port = 0;
for (p, s) in state.udp_sockets.iter() {
if !state.settings.fixed_ports.contains(p) {
let (total_smart_ptr_handles, most_recent_receive) = s.read().liveness();
if total_smart_ptr_handles < most_stale_binding_liveness.0
|| (total_smart_ptr_handles == most_stale_binding_liveness.0
&& most_recent_receive <= most_stale_binding_liveness.1)
{
most_stale_binding_liveness.0 = total_smart_ptr_handles;
most_stale_binding_liveness.1 = most_recent_receive;
most_stale_binding_port = *p;
let state = if !need_fixed_ports.is_empty() || have_random_port_count != desired_random_port_count {
drop(state);
let mut state = self.state.write();
for p in need_fixed_ports.iter() {
state.udp_sockets.insert(*p, parking_lot::RwLock::new(BoundUdpPort::new(*p)));
}
while have_random_port_count > desired_random_port_count {
let mut most_stale_binding_liveness = (usize::MAX, i64::MAX);
let mut most_stale_binding_port = 0;
for (p, s) in state.udp_sockets.iter() {
if !state.settings.fixed_ports.contains(p) {
let (total_smart_ptr_handles, most_recent_receive) = s.read().liveness();
if total_smart_ptr_handles < most_stale_binding_liveness.0
|| (total_smart_ptr_handles == most_stale_binding_liveness.0
&& most_recent_receive <= most_stale_binding_liveness.1)
{
most_stale_binding_liveness.0 = total_smart_ptr_handles;
most_stale_binding_liveness.1 = most_recent_receive;
most_stale_binding_port = *p;
}
}
}
}
if most_stale_binding_port != 0 {
have_random_port_count -= state.udp_sockets.remove(&most_stale_binding_port).is_some() as usize;
} else {
break;
}
}
'outer_add_port_loop: while have_random_port_count < desired_random_port_count {
let rn = random::xorshift64_random() as usize;
for i in 0..UNASSIGNED_PRIVILEGED_PORTS.len() {
let p = UNASSIGNED_PRIVILEGED_PORTS[rn.wrapping_add(i) % UNASSIGNED_PRIVILEGED_PORTS.len()];
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)));
continue 'outer_add_port_loop;
if most_stale_binding_port != 0 {
have_random_port_count -= state.udp_sockets.remove(&most_stale_binding_port).is_some() as usize;
} else {
break;
}
}
let p = 50000 + ((random::xorshift64_random() as u16) % 15535);
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
have_random_port_count += state
.udp_sockets
.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)))
.is_none() as usize;
'outer_add_port_loop: while have_random_port_count < desired_random_port_count {
let rn = random::xorshift64_random() as usize;
for i in 0..UNASSIGNED_PRIVILEGED_PORTS.len() {
let p = UNASSIGNED_PRIVILEGED_PORTS[rn.wrapping_add(i) % UNASSIGNED_PRIVILEGED_PORTS.len()];
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)));
continue 'outer_add_port_loop;
}
}
let p = 50000 + ((random::xorshift64_random() as u16) % 15535);
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
have_random_port_count += state
.udp_sockets
.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)))
.is_none() as usize;
}
}
}
drop(state);
self.state.read().await
} else {
state
};
drop(state);
self.state.read()
} else {
state
};
let num_cores = std::thread::available_parallelism().map_or(1, |c| c.get());
for (_, binding) in state.udp_sockets.iter() {
let mut binding = binding.write();
let (_, mut new_sockets) =
binding.update_bindings(&state.settings.interface_prefix_blacklist, &state.settings.cidr_blacklist);
for s in new_sockets.drain(..) {
// Start one async task per system core. This is technically not necessary because tokio
// schedules and multiplexes, but this enables tokio to grab and schedule packets
// concurrently for up to the number of cores available for any given socket and is
// probably faster than other patterns that involve iterating through sockets and creating
// arrays of futures or using channels.
let mut socket_tasks = Vec::with_capacity(num_cores);
for _ in 0..num_cores {
let self_copy = self.clone();
let s_copy = s.clone();
let local_socket = LocalSocket::new(&s);
socket_tasks.push(tokio::spawn(async move {
loop {
let mut buf = self_copy.node().get_packet_buffer();
let now = ms_monotonic();
if let Ok((bytes, from_sockaddr)) = s_copy.receive(unsafe { buf.entire_buffer_mut() }, now).await {
unsafe { buf.set_size_unchecked(bytes) };
self_copy
.node()
.handle_incoming_physical_packet(
let num_cores = std::thread::available_parallelism().map_or(1, |c| c.get());
for (_, binding) in state.udp_sockets.iter() {
let mut binding = binding.write();
let (_, mut new_sockets) =
binding.update_bindings(&state.settings.interface_prefix_blacklist, &state.settings.cidr_blacklist);
for s in new_sockets.drain(..) {
// Start one async task per system core. This is technically not necessary because tokio
// schedules and multiplexes, but this enables tokio to grab and schedule packets
// concurrently for up to the number of cores available for any given socket and is
// probably faster than other patterns that involve iterating through sockets and creating
// arrays of futures or using channels.
let mut socket_tasks = Vec::with_capacity(num_cores);
for _ in 0..num_cores {
let self_copy = self.clone();
let s_copy = s.clone();
let local_socket = LocalSocket::new(&s);
socket_tasks.push(tokio::spawn(async move {
loop {
let mut buf = self_copy.buffer_pool.get();
let now = ms_monotonic();
if let Ok((bytes, from_sockaddr)) = s_copy.receive(unsafe { buf.entire_buffer_mut() }, now).await {
unsafe { buf.set_size_unchecked(bytes) };
self_copy.node().handle_incoming_physical_packet(
&*self_copy,
&*self_copy.inner,
&Endpoint::IpUdp(InetAddress::from(from_sockaddr)),
&local_socket,
&s_copy.interface,
buf,
)
.await;
);
}
}
}
}));
}));
}
debug_assert!(s.associated_tasks.lock().is_empty());
*s.associated_tasks.lock() = socket_tasks;
}
debug_assert!(s.associated_tasks.lock().is_empty());
*s.associated_tasks.lock() = socket_tasks;
}
}
@ -197,12 +202,11 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
async fn node_background_task_daemon(self: Arc<Self>) {
tokio::time::sleep(Duration::from_secs(1)).await;
loop {
tokio::time::sleep(self.node().do_background_tasks(self.as_ref()).await).await;
tokio::time::sleep(self.node().do_background_tasks(self.as_ref())).await;
}
}
}
#[async_trait]
impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl: InnerProtocol> HostSystem
for VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl>
{
@ -216,37 +220,35 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
}
}
async fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {}
#[inline(always)]
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool {
socket.is_valid()
}
async fn wire_send(
fn wire_send(
&self,
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
data: &[u8],
packet_ttl: u8,
) -> bool {
) {
match endpoint {
Endpoint::IpUdp(address) => {
// This is the fast path -- the socket is known to the core so just send it.
if let Some(s) = local_socket {
if let Some(s) = s.0.upgrade() {
return s.send_sync_nonblock(address, data, packet_ttl);
s.send_sync_nonblock(address, data, packet_ttl);
} else {
return false;
return;
}
}
let state = self.state.read().await;
let state = self.state.read();
if !state.udp_sockets.is_empty() {
if let Some(specific_interface) = local_interface {
// Send from a specific interface if that interface is specified.
for (_, p) in state.udp_sockets.iter() {
'socket_search: for (_, p) in state.udp_sockets.iter() {
let p = p.read();
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
@ -254,7 +256,7 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
let s = p.sockets.get(i).unwrap();
if s.interface.eq(specific_interface) {
if s.send_sync_nonblock(address, data, packet_ttl) {
return true;
break 'socket_search;
}
}
i = (i + 1) % p.sockets.len();
@ -279,15 +281,11 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
}
}
}
return !sent_on_interfaces.is_empty();
}
}
return false;
}
_ => {}
}
return false;
}
#[inline(always)]
@ -305,15 +303,10 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
for VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl>
{
fn drop(&mut self) {
loop {
if let Ok(mut state) = self.state.try_write() {
for d in state.daemons.drain(..) {
d.abort();
}
state.udp_sockets.clear();
break;
}
std::thread::sleep(Duration::from_millis(2));
let mut state = self.state.write();
for d in state.daemons.drain(..) {
d.abort();
}
state.udp_sockets.clear();
}
}