Use a clever little hack to eliminate a great deal of cascading template sprawl from the core.

This commit is contained in:
Adam Ierymenko 2022-10-21 16:50:07 -07:00
parent 39aa46ebf5
commit cf7f0b06df
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
8 changed files with 328 additions and 181 deletions

View file

@ -1,6 +1,5 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::any::Any;
use std::error::Error;
use std::sync::{Arc, Mutex, RwLock, Weak};
@ -127,9 +126,9 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
_node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
verb: u8,
@ -193,7 +192,7 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
.await
{
Result::Ok((result, Some(config))) => {
inner.send_network_config(peer, &config, Some(message_id));
inner.send_network_config(peer.as_ref(), &config, Some(message_id));
result
}
Result::Ok((result, None)) => result,
@ -234,9 +233,9 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
@ -251,9 +250,9 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
@ -272,12 +271,11 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
fn send_network_config(
&self,
peer: Arc<dyn Any>, // hack can go away when Rust has specialization
peer: &Peer,
config: &NetworkConfig,
in_re_message_id: Option<u64>, // None for unsolicited push
) {
if let Some(host_system) = self.service.read().unwrap().upgrade() {
if let Some(peer) = peer.downcast_ref::<Peer<VL1Service<DatabaseImpl, Handler<DatabaseImpl>, Handler<DatabaseImpl>>>>() {
peer.send(
host_system.as_ref(),
host_system.node(),
@ -320,9 +318,6 @@ impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
Ok(())
},
);
} else {
panic!("HostSystem implementation mismatch with service to which controller is harnessed");
}
}
}

View file

@ -24,6 +24,7 @@ use zerotier_utils::error::InvalidParameterError;
use zerotier_utils::gate::IntervalGate;
use zerotier_utils::hex;
use zerotier_utils::marshalable::Marshalable;
use zerotier_utils::pocket::Pocket;
use zerotier_utils::ringbuffer::RingBuffer;
/// Trait implemented by external code to handle events and provide an interface to the system or application.
@ -35,7 +36,7 @@ pub trait HostSystem: Sync + Send + 'static {
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
/// Type for local system interfaces.
type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized;
type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
/// A VL1 level event occurred.
fn event(&self, event: Event);
@ -136,9 +137,9 @@ pub trait InnerProtocol: Sync + Send {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
verb: u8,
@ -149,9 +150,9 @@ pub trait InnerProtocol: Sync + Send {
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
@ -165,9 +166,9 @@ pub trait InnerProtocol: Sync + Send {
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
@ -183,12 +184,12 @@ pub trait InnerProtocol: Sync + Send {
/// How often to check the root cluster definitions against the root list and update.
const ROOT_SYNC_INTERVAL_MS: i64 = 1000;
struct RootInfo<HostSystemImpl: HostSystem + ?Sized> {
struct RootInfo {
/// Root sets to which we are a member.
sets: HashMap<String, Verified<RootSet>>,
/// Root peers and their statically defined endpoints (from root sets).
roots: HashMap<Arc<Peer<HostSystemImpl>>, Vec<Endpoint>>,
roots: HashMap<Arc<Peer>, Vec<Endpoint>>,
/// If this node is a root, these are the root sets to which it's a member in binary serialized form.
/// Set to None if this node is not a root, meaning it doesn't appear in any of its root sets.
@ -213,14 +214,17 @@ struct BackgroundTaskIntervals {
}
/// WHOIS requests and any packets that are waiting on them to be decrypted and authenticated.
struct WhoisQueueItem<HostSystemImpl: HostSystem + ?Sized> {
v1_proto_waiting_packets: RingBuffer<(Weak<Path<HostSystemImpl>>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>,
struct WhoisQueueItem {
v1_proto_waiting_packets: RingBuffer<(Weak<Path>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>,
last_retry_time: i64,
retry_count: u16,
}
const PATH_MAP_SIZE: usize = std::mem::size_of::<HashMap<[u8; std::mem::size_of::<Endpoint>() + 128], Arc<Path>>>();
type PathMap<HostSystemImpl> = HashMap<PathKey<'static, 'static, HostSystemImpl>, Arc<Path>>;
/// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network.
pub struct Node<HostSystemImpl: HostSystem + ?Sized> {
pub struct Node {
/// A random ID generated to identify this particular running instance.
///
/// This can be used to implement multi-homing by allowing remote nodes to distinguish instances
@ -234,23 +238,23 @@ pub struct Node<HostSystemImpl: HostSystem + ?Sized> {
intervals: Mutex<BackgroundTaskIntervals>,
/// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use.
paths: RwLock<HashMap<PathKey<'static, 'static, HostSystemImpl>, Arc<Path<HostSystemImpl>>>>,
paths: RwLock<Pocket<PATH_MAP_SIZE>>,
/// Peers with which we are currently communicating.
peers: RwLock<HashMap<Address, Arc<Peer<HostSystemImpl>>>>,
peers: RwLock<HashMap<Address, Arc<Peer>>>,
/// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions.
roots: RwLock<RootInfo<HostSystemImpl>>,
roots: RwLock<RootInfo>,
/// Current best root.
best_root: RwLock<Option<Arc<Peer<HostSystemImpl>>>>,
best_root: RwLock<Option<Arc<Peer>>>,
/// Queue of identities being looked up.
whois_queue: Mutex<HashMap<Address, WhoisQueueItem<HostSystemImpl>>>,
whois_queue: Mutex<HashMap<Address, WhoisQueueItem>>,
}
impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
pub fn new<NodeStorageImpl: NodeStorage + ?Sized>(
impl Node {
pub fn new<HostSystemImpl: HostSystem + ?Sized, NodeStorageImpl: NodeStorage + ?Sized>(
host_system: &HostSystemImpl,
storage: &NodeStorageImpl,
auto_generate_identity: bool,
@ -286,7 +290,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
instance_id: random::get_bytes_secure(),
identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: RwLock::new(HashMap::new()),
paths: RwLock::new(Pocket::new(PathMap::<HostSystemImpl>::new())),
peers: RwLock::new(HashMap::new()),
roots: RwLock::new(RootInfo {
sets: HashMap::new(),
@ -300,7 +304,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
})
}
pub fn peer(&self, a: Address) -> Option<Arc<Peer<HostSystemImpl>>> {
pub fn peer(&self, a: Address) -> Option<Arc<Peer>> {
self.peers.read().unwrap().get(&a).cloned()
}
@ -309,12 +313,12 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
}
/// Get the current "best" root from among this node's trusted roots.
pub fn best_root(&self) -> Option<Arc<Peer<HostSystemImpl>>> {
pub fn best_root(&self) -> Option<Arc<Peer>> {
self.best_root.read().unwrap().clone()
}
/// Check whether a peer is a root according to any root set trusted by this node.
pub fn is_peer_root(&self, peer: &Peer<HostSystemImpl>) -> bool {
pub fn is_peer_root(&self, peer: &Peer) -> bool {
self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity))
}
@ -360,7 +364,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect()
}
pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration {
pub fn do_background_tasks<HostSystemImpl: HostSystem + ?Sized>(&self, host_system: &HostSystemImpl) -> Duration {
const INTERVAL_MS: i64 = 1000;
const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64);
let time_ticks = host_system.time_ticks();
@ -477,7 +481,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
if let Some(peer) = peers.get(&m.identity.address) {
new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
} else {
if let Some(peer) = Peer::<HostSystemImpl>::new(&self.identity, m.identity.clone(), time_ticks) {
if let Some(peer) = Peer::new(&self.identity, m.identity.clone(), time_ticks) {
drop(peers);
new_roots.insert(
self.peers
@ -636,7 +640,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
let mut need_keepalive = Vec::new();
// First check all paths in read mode to avoid blocking the entire node.
for (k, path) in self.paths.read().unwrap().iter() {
for (k, path) in self.paths.read().unwrap().get::<PathMap<HostSystemImpl>>().iter() {
if host_system.local_socket_is_valid(k.local_socket()) {
match path.service(time_ticks) {
PathServiceResult::Ok => {}
@ -650,13 +654,19 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
// Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking.
for dp in dead_paths.iter() {
self.paths.write().unwrap().remove(dp);
self.paths.write().unwrap().get_mut::<PathMap<HostSystemImpl>>().remove(dp);
}
// Finally run keepalive sends as a batch.
let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance
for p in need_keepalive.iter() {
host_system.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &keepalive_buf, 0);
host_system.wire_send(
&p.endpoint,
Some(p.local_socket::<HostSystemImpl>()),
Some(p.local_interface::<HostSystemImpl>()),
&keepalive_buf,
0,
);
}
}
@ -683,7 +693,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
INTERVAL
}
pub fn handle_incoming_physical_packet<InnerProtocolImpl: InnerProtocol + ?Sized>(
pub fn handle_incoming_physical_packet<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
@ -727,7 +737,8 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
if dest == self.identity.address {
let fragment_header = &*fragment_header; // discard mut
let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks);
let path =
self.canonical_path::<HostSystemImpl>(source_endpoint, source_local_socket, source_local_interface, time_ticks);
path.log_receive_anything(time_ticks);
if fragment_header.is_fragment() {
@ -852,8 +863,8 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
if let Some(forward_path) = peer.direct_path() {
host_system.wire_send(
&forward_path.endpoint,
Some(&forward_path.local_socket),
Some(&forward_path.local_interface),
Some(forward_path.local_socket::<HostSystemImpl>()),
Some(forward_path.local_interface::<HostSystemImpl>()),
packet.as_bytes(),
0,
);
@ -870,17 +881,17 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
}
/// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply.
fn whois(
fn whois<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
address: Address,
waiting_packet: Option<(Weak<Path<HostSystemImpl>>, PooledPacketBuffer)>,
waiting_packet: Option<(Weak<Path>, PooledPacketBuffer)>,
time_ticks: i64,
) {
debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string());
{
let mut whois_queue = self.whois_queue.lock().unwrap();
let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem::<HostSystemImpl> {
let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem {
v1_proto_waiting_packets: RingBuffer::new(),
last_retry_time: 0,
retry_count: 0,
@ -899,7 +910,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
}
/// Send a WHOIS query to the current best root.
fn send_whois(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) {
fn send_whois<HostSystemImpl: HostSystem + ?Sized>(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) {
debug_assert!(!addresses.is_empty());
if let Some(root) = self.best_root() {
while !addresses.is_empty() {
@ -921,7 +932,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
}
/// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS).
pub(crate) fn handle_incoming_identity<InnerProtocolImpl: InnerProtocol + ?Sized>(
pub(crate) fn handle_incoming_identity<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
@ -972,23 +983,31 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
}
/// Get the canonical Path object corresponding to an endpoint.
pub(crate) fn canonical_path(
pub(crate) fn canonical_path<HostSystemImpl: HostSystem + ?Sized>(
&self,
ep: &Endpoint,
local_socket: &HostSystemImpl::LocalSocket,
local_interface: &HostSystemImpl::LocalInterface,
time_ticks: i64,
) -> Arc<Path<HostSystemImpl>> {
) -> Arc<Path> {
let paths = self.paths.read().unwrap();
if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) {
if let Some(path) = paths.get::<PathMap<HostSystemImpl>>().get(&PathKey::Ref(ep, local_socket)) {
path.clone()
} else {
drop(paths);
self.paths
.write()
.unwrap()
.get_mut::<PathMap<HostSystemImpl>>()
.entry(PathKey::Copied(ep.clone(), local_socket.clone()))
.or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks)))
.or_insert_with(|| {
Arc::new(Path::new::<HostSystemImpl>(
ep.clone(),
local_socket.clone(),
local_interface.clone(),
time_ticks,
))
})
.clone()
}
}
@ -1056,9 +1075,9 @@ impl InnerProtocol for DummyInnerProtocol {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_verb: u8,
@ -1071,9 +1090,9 @@ impl InnerProtocol for DummyInnerProtocol {
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
@ -1089,9 +1108,9 @@ impl InnerProtocol for DummyInnerProtocol {
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,

View file

@ -10,6 +10,7 @@ use crate::vl1::endpoint::Endpoint;
use crate::vl1::node::*;
use zerotier_crypto::random;
use zerotier_utils::pocket::Pocket;
use zerotier_utils::NEVER_HAPPENED_TICKS;
pub(crate) const SERVICE_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL;
@ -24,19 +25,18 @@ pub(crate) enum PathServiceResult {
/// These are maintained in Node and canonicalized so that all unique paths have
/// one and only one unique path object. That enables statistics to be tracked
/// for them and uniform application of things like keepalives.
pub struct Path<HostSystemImpl: HostSystem + ?Sized> {
pub struct Path {
pub endpoint: Endpoint,
pub local_socket: HostSystemImpl::LocalSocket,
pub local_interface: HostSystemImpl::LocalInterface,
local_socket: Pocket<64>,
local_interface: Pocket<64>,
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<u64, v1::FragmentedPacket, PacketIdHasher>>,
}
impl<HostSystemImpl: HostSystem + ?Sized> Path<HostSystemImpl> {
#[inline]
pub fn new(
impl Path {
pub(crate) fn new<HostSystemImpl: HostSystem + ?Sized>(
endpoint: Endpoint,
local_socket: HostSystemImpl::LocalSocket,
local_interface: HostSystemImpl::LocalInterface,
@ -44,8 +44,8 @@ impl<HostSystemImpl: HostSystem + ?Sized> Path<HostSystemImpl> {
) -> Self {
Self {
endpoint,
local_socket,
local_interface,
local_socket: Pocket::new(local_socket),
local_interface: Pocket::new(local_interface),
last_send_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS),
last_receive_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS),
create_time_ticks: time_ticks,
@ -53,9 +53,18 @@ impl<HostSystemImpl: HostSystem + ?Sized> Path<HostSystemImpl> {
}
}
#[inline(always)]
pub(crate) fn local_socket<HostSystemImpl: HostSystem + ?Sized>(&self) -> &HostSystemImpl::LocalSocket {
self.local_socket.get()
}
#[inline(always)]
pub(crate) fn local_interface<HostSystemImpl: HostSystem + ?Sized>(&self) -> &HostSystemImpl::LocalInterface {
self.local_socket.get()
}
/// Receive a fragment and return a FragmentedPacket if the entire packet was assembled.
/// This returns None if more fragments are needed to assemble the packet.
#[inline]
pub(crate) fn receive_fragment(
&self,
packet_id: u64,
@ -102,7 +111,6 @@ impl<HostSystemImpl: HostSystem + ?Sized> Path<HostSystemImpl> {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
}
#[inline]
pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult {
self.fragmented_packets
.lock()

View file

@ -23,11 +23,11 @@ use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000;
pub struct Peer<HostSystemImpl: HostSystem + ?Sized> {
pub struct Peer {
pub identity: Identity,
v1_proto_static_secret: v1::SymmetricSecret,
paths: Mutex<Vec<PeerPath<HostSystemImpl>>>,
paths: Mutex<Vec<PeerPath>>,
pub(crate) last_send_time_ticks: AtomicI64,
pub(crate) last_receive_time_ticks: AtomicI64,
@ -40,8 +40,8 @@ pub struct Peer<HostSystemImpl: HostSystem + ?Sized> {
remote_node_info: RwLock<RemoteNodeInfo>,
}
struct PeerPath<HostSystemImpl: HostSystem + ?Sized> {
path: Weak<Path<HostSystemImpl>>,
struct PeerPath {
path: Weak<Path>,
last_receive_time_ticks: i64,
}
@ -52,11 +52,11 @@ struct RemoteNodeInfo {
}
/// Sort a list of paths by quality or priority, with best paths first.
fn prioritize_paths<HostSystemImpl: HostSystem + ?Sized>(paths: &mut Vec<PeerPath<HostSystemImpl>>) {
fn prioritize_paths<HostSystemImpl: HostSystem + ?Sized>(paths: &mut Vec<PeerPath>) {
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
}
impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
impl Peer {
/// Create a new peer.
///
/// This only returns None if this_node_identity does not have its secrets or if some
@ -112,7 +112,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
/// Get current best path or None if there are no direct paths to this peer.
#[inline]
pub fn direct_path(&self) -> Option<Arc<Path<HostSystemImpl>>> {
pub fn direct_path(&self) -> Option<Arc<Path>> {
for p in self.paths.lock().unwrap().iter() {
let pp = p.path.upgrade();
if pp.is_some() {
@ -124,7 +124,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
/// Get either the current best direct path or an indirect path via e.g. a root.
#[inline]
pub fn path(&self, node: &Node<HostSystemImpl>) -> Option<Arc<Path<HostSystemImpl>>> {
pub fn path(&self, node: &Node) -> Option<Arc<Path>> {
let direct_path = self.direct_path();
if direct_path.is_some() {
return direct_path;
@ -135,7 +135,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return None;
}
fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc<Path<HostSystemImpl>>, time_ticks: i64) {
fn learn_path<HostSystemImpl: HostSystem + ?Sized>(&self, host_system: &HostSystemImpl, new_path: &Arc<Path>, time_ticks: i64) {
let mut paths = self.paths.lock().unwrap();
match &new_path.endpoint {
@ -160,7 +160,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
);
pi.path = Arc::downgrade(new_path);
pi.last_receive_time_ticks = time_ticks;
prioritize_paths(&mut paths);
prioritize_paths::<HostSystemImpl>(&mut paths);
return;
}
}
@ -185,11 +185,11 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
self.identity.address.to_string(),
new_path.endpoint.to_string()
);
paths.push(PeerPath::<HostSystemImpl> {
paths.push(PeerPath {
path: Arc::downgrade(new_path),
last_receive_time_ticks: time_ticks,
});
prioritize_paths(&mut paths);
prioritize_paths::<HostSystemImpl>(&mut paths);
}
/// Get the next sequential message ID for use with the V1 transport protocol.
@ -199,7 +199,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
}
/// Called every SERVICE_INTERVAL_MS by the background service loop in Node.
pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node<HostSystemImpl>, time_ticks: i64) -> bool {
pub(crate) fn service<HostSystemImpl: HostSystem + ?Sized>(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool {
// Prune dead paths and sort in descending order of quality.
{
let mut paths = self.paths.lock().unwrap();
@ -207,7 +207,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
if paths.capacity() > 16 {
paths.shrink_to_fit();
}
prioritize_paths(&mut paths);
prioritize_paths::<HostSystemImpl>(&mut paths);
}
// Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs).
@ -220,7 +220,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
}
/// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed.
fn v1_proto_internal_send(
fn v1_proto_internal_send<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
endpoint: &Endpoint,
@ -278,11 +278,11 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
/// The builder function must append the verb (with any verb flags) and packet payload. If it returns
/// an error, the error is returned immediately and the send is aborted. None is returned if the send
/// function itself fails for some reason such as no paths being available.
pub fn send<R, E, BuilderFunction: FnOnce(&mut PacketBuffer) -> Result<R, E>>(
pub fn send<HostSystemImpl: HostSystem + ?Sized, R, E, BuilderFunction: FnOnce(&mut PacketBuffer) -> Result<R, E>>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
path: Option<&Arc<Path<HostSystemImpl>>>,
node: &Node,
path: Option<&Arc<Path>>,
time_ticks: i64,
builder_function: BuilderFunction,
) -> Option<Result<R, E>> {
@ -370,8 +370,8 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
self.v1_proto_internal_send(
host_system,
&path.endpoint,
Some(&path.local_socket),
Some(&path.local_interface),
Some(path.local_socket::<HostSystemImpl>()),
Some(path.local_interface::<HostSystemImpl>()),
max_fragment_size,
packet,
);
@ -390,10 +390,10 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
/// Unlike other messages HELLO is sent partially in the clear and always with the long-lived
/// static identity key. Authentication in old versions is via Poly1305 and in new versions
/// via HMAC-SHA512.
pub(crate) fn send_hello(
pub(crate) fn send_hello<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
node: &Node,
explicit_endpoint: Option<&Endpoint>,
) -> bool {
let mut path = None;
@ -456,8 +456,8 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
self.v1_proto_internal_send(
host_system,
destination,
Some(&p.local_socket),
Some(&p.local_interface),
Some(p.local_socket::<HostSystemImpl>()),
Some(p.local_interface::<HostSystemImpl>()),
max_fragment_size,
packet,
);
@ -475,13 +475,13 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
/// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) fn v1_proto_receive<InnerProtocolImpl: InnerProtocol + ?Sized>(
pub(crate) fn v1_proto_receive<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
self: &Arc<Self>,
node: &Node<HostSystemImpl>,
node: &Node,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
time_ticks: i64,
source_path: &Arc<Path<HostSystemImpl>>,
source_path: &Arc<Path>,
packet_header: &v1::PacketHeader,
frag0: &PacketBuffer,
fragments: &[Option<PooledPacketBuffer>],
@ -593,14 +593,14 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return PacketHandlerResult::Error;
}
fn handle_incoming_hello<InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_hello<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
node: &Node,
time_ticks: i64,
message_id: MessageId,
source_path: &Arc<Path<HostSystemImpl>>,
source_path: &Arc<Path>,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
@ -654,13 +654,13 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return PacketHandlerResult::Error;
}
fn handle_incoming_error<InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_error<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
_: i64,
source_path: &Arc<Path<HostSystemImpl>>,
node: &Node,
_time_ticks: i64,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
payload: &PacketBuffer,
@ -691,13 +691,13 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return PacketHandlerResult::Error;
}
fn handle_incoming_ok<InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_ok<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
node: &Node,
time_ticks: i64,
source_path: &Arc<Path<HostSystemImpl>>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
path_is_known: bool,
@ -791,11 +791,11 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return PacketHandlerResult::Error;
}
fn handle_incoming_whois<InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_whois<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
node: &Node,
time_ticks: i64,
message_id: MessageId,
payload: &PacketBuffer,
@ -824,24 +824,24 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return PacketHandlerResult::Ok;
}
fn handle_incoming_rendezvous(
fn handle_incoming_rendezvous<HostSystemImpl: HostSystem + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
node: &Node,
time_ticks: i64,
message_id: MessageId,
source_path: &Arc<Path<HostSystemImpl>>,
source_path: &Arc<Path>,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if node.is_peer_root(self) {}
return PacketHandlerResult::Ok;
}
fn handle_incoming_echo<InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_echo<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
node: &Node,
time_ticks: i64,
message_id: MessageId,
payload: &PacketBuffer,
@ -864,44 +864,44 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return PacketHandlerResult::Ok;
}
fn handle_incoming_push_direct_paths(
fn handle_incoming_push_direct_paths<HostSystemImpl: HostSystem + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
node: &Node,
time_ticks: i64,
source_path: &Arc<Path<HostSystemImpl>>,
source_path: &Arc<Path>,
payload: &PacketBuffer,
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
fn handle_incoming_user_message(
fn handle_incoming_user_message<HostSystemImpl: HostSystem + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
node: &Node,
time_ticks: i64,
source_path: &Arc<Path<HostSystemImpl>>,
source_path: &Arc<Path>,
payload: &PacketBuffer,
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
}
impl<HostSystemImpl: HostSystem + ?Sized> Hash for Peer<HostSystemImpl> {
impl Hash for Peer {
#[inline(always)]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
state.write_u64(self.identity.address.into());
}
}
impl<HostSystemImpl: HostSystem + ?Sized> PartialEq for Peer<HostSystemImpl> {
impl PartialEq for Peer {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.identity.fingerprint.eq(&other.identity.fingerprint)
}
}
impl<HostSystemImpl: HostSystem + ?Sized> Eq for Peer<HostSystemImpl> {}
impl Eq for Peer {}
fn v1_proto_try_aead_decrypt(
secret: &v1::SymmetricSecret,

View file

@ -14,9 +14,9 @@ impl InnerProtocol for Switch {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
verb: u8,
@ -28,9 +28,9 @@ impl InnerProtocol for Switch {
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
@ -45,9 +45,9 @@ impl InnerProtocol for Switch {
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,

View file

@ -14,6 +14,7 @@ pub mod io;
pub mod json;
pub mod marshalable;
pub mod memory;
pub mod pocket;
pub mod pool;
pub mod ringbuffer;
pub mod ringbuffermap;

124
utils/src/pocket.rs Normal file
View file

@ -0,0 +1,124 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::any::TypeId;
use std::mem::{forget, size_of, MaybeUninit};
use std::ptr::{drop_in_place, read, write};
/// A statically sized container that acts a bit like Box<dyn Any>.
///
/// This is used in a few places to avoid cascades of templates by allowing templated
/// objects to be held generically and accessed only within templated functions. It does so
/// with very low to zero runtime overhead or memory overhead and panics if misused.
///
/// This will panic if the capacity is too small. If that occurs, it must be enlarged. It will
/// also panic if any of the accessors (other than the try_ versions) are used to try to get
/// a type other than the one it was
pub struct Pocket<const CAPACITY: usize> {
storage: [u8; CAPACITY],
dropper: fn(*mut u8),
data_type: TypeId,
}
impl<const CAPACITY: usize> Pocket<CAPACITY> {
#[inline(always)]
pub fn new<T: Sized + 'static>(x: T) -> Self {
assert!(size_of::<T>() <= CAPACITY);
let mut p = Self {
storage: unsafe { MaybeUninit::uninit().assume_init() },
dropper: |s: *mut u8| unsafe {
drop_in_place::<T>((*s.cast::<Self>()).storage.as_mut_ptr().cast());
},
data_type: TypeId::of::<T>(),
};
unsafe { write(p.storage.as_mut_ptr().cast(), x) };
p
}
#[inline(always)]
pub fn get<T: Sized + 'static>(&self) -> &T {
assert_eq!(TypeId::of::<T>(), self.data_type);
unsafe { &*self.storage.as_ptr().cast() }
}
#[inline(always)]
pub fn get_mut<T: Sized + 'static>(&mut self) -> &mut T {
assert_eq!(TypeId::of::<T>(), self.data_type);
unsafe { &mut *self.storage.as_mut_ptr().cast() }
}
#[inline(always)]
pub fn try_get<T: Sized + 'static>(&self) -> Option<&T> {
if TypeId::of::<T>() == self.data_type {
Some(unsafe { &*self.storage.as_ptr().cast() })
} else {
None
}
}
#[inline(always)]
pub fn try_get_mut<T: Sized + 'static>(&mut self) -> Option<&mut T> {
if TypeId::of::<T>() == self.data_type {
Some(unsafe { &mut *self.storage.as_mut_ptr().cast() })
} else {
None
}
}
#[inline(always)]
pub fn unwrap<T: Sized + 'static>(self) -> T {
assert_eq!(TypeId::of::<T>(), self.data_type);
let x = unsafe { read(self.storage.as_ptr().cast()) };
forget(self);
x
}
}
impl<T: Sized + 'static, const CAPACITY: usize> AsRef<T> for Pocket<CAPACITY> {
#[inline(always)]
fn as_ref(&self) -> &T {
assert_eq!(TypeId::of::<T>(), self.data_type);
unsafe { &*self.storage.as_ptr().cast() }
}
}
impl<T: Sized + 'static, const CAPACITY: usize> AsMut<T> for Pocket<CAPACITY> {
#[inline(always)]
fn as_mut(&mut self) -> &mut T {
assert_eq!(TypeId::of::<T>(), self.data_type);
unsafe { &mut *self.storage.as_mut_ptr().cast() }
}
}
impl<const CAPACITY: usize> Drop for Pocket<CAPACITY> {
#[inline(always)]
fn drop(&mut self) {
(self.dropper)((self as *mut Self).cast());
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::rc::Rc;
#[test]
fn typing_and_life_cycle() {
let test_obj = Rc::new(1i32);
assert_eq!(Rc::strong_count(&test_obj), 1);
let a = Pocket::<32>::new(test_obj.clone());
let b = Pocket::<32>::new(test_obj.clone());
let c = Pocket::<32>::new(test_obj.clone());
assert!(a.get::<Rc<i32>>().eq(b.get()));
assert!(a.try_get::<Rc<i32>>().is_some());
assert!(a.try_get::<Rc<usize>>().is_none());
assert_eq!(Rc::strong_count(&test_obj), 4);
drop(a);
assert_eq!(Rc::strong_count(&test_obj), 3);
drop(b);
assert_eq!(Rc::strong_count(&test_obj), 2);
let c = c.unwrap::<Rc<i32>>();
assert_eq!(Rc::strong_count(&test_obj), 2);
drop(c);
assert_eq!(Rc::strong_count(&test_obj), 1);
}
}

View file

@ -36,7 +36,7 @@ pub struct VL1Service<
inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
buffer_pool: Arc<PacketBufferPool>,
node_container: Option<Node<Self>>, // never None, set in new()
node_container: Option<Node>, // never None, set in new()
}
struct VL1ServiceMutableState {
@ -86,7 +86,7 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
}
#[inline(always)]
pub fn node(&self) -> &Node<Self> {
pub fn node(&self) -> &Node {
debug_assert!(self.node_container.is_some());
unsafe { self.node_container.as_ref().unwrap_unchecked() }
}
@ -198,8 +198,8 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
packet: zerotier_network_hypervisor::protocol::PooledPacketBuffer,
) {
self.node().handle_incoming_physical_packet(
&*self,
&*self.inner,
self.as_ref(),
self.inner.as_ref(),
&Endpoint::IpUdp(source_address.clone()),
&LocalSocket::new(socket),
&socket.interface,