Simplify some Rust generic madness.

This commit is contained in:
Adam Ierymenko 2023-03-14 13:30:35 -04:00
parent dd9f1cffe5
commit f0158ee8ae
11 changed files with 228 additions and 303 deletions

View file

@ -17,6 +17,7 @@ use zerotier_network_hypervisor::vl2::v1::Revocation;
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::blob::Blob;
use zerotier_utils::buffer::OutOfBoundsError;
use zerotier_utils::cast::cast_ref;
use zerotier_utils::error::InvalidParameterError;
use zerotier_utils::reaper::Reaper;
use zerotier_utils::tokio;
@ -139,62 +140,56 @@ impl Controller {
}
/// Compose and send network configuration packet (either V1 or V2)
fn send_network_config(
fn send_network_config<Application: ApplicationLayer + ?Sized>(
&self,
peer: &Peer,
app: &Application,
node: &Node<Application>,
peer: &Peer<Application>,
config: &NetworkConfig,
in_re_message_id: Option<u64>, // None for unsolicited push
) {
if let Some(host_system) = self.service.read().unwrap().upgrade() {
peer.send(
host_system.as_ref(),
host_system.node(),
None,
ms_monotonic(),
|packet| -> Result<(), OutOfBoundsError> {
let payload_start = packet.len();
peer.send(app, node, None, ms_monotonic(), |packet| -> Result<(), OutOfBoundsError> {
let payload_start = packet.len();
if let Some(in_re_message_id) = in_re_message_id {
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
ok_header.verb = protocol::message_type::VL1_OK;
ok_header.in_re_verb = protocol::message_type::VL2_NETWORK_CONFIG_REQUEST;
ok_header.in_re_message_id = in_re_message_id.to_be_bytes();
} else {
packet.append_u8(protocol::message_type::VL2_NETWORK_CONFIG)?;
}
if let Some(in_re_message_id) = in_re_message_id {
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
ok_header.verb = protocol::message_type::VL1_OK;
ok_header.in_re_verb = protocol::message_type::VL2_NETWORK_CONFIG_REQUEST;
ok_header.in_re_message_id = in_re_message_id.to_be_bytes();
} else {
packet.append_u8(protocol::message_type::VL2_NETWORK_CONFIG)?;
}
if peer.is_v2() {
todo!()
} else {
let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) {
config_dict.to_bytes()
} else {
eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary");
return Err(OutOfBoundsError); // abort
};
if config_data.len() > (u16::MAX as usize) {
eprintln!("WARNING: network config is larger than 65536 bytes!");
return Err(OutOfBoundsError); // abort
}
if peer.is_v2() {
todo!()
} else {
let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) {
config_dict.to_bytes()
} else {
eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary");
return Err(OutOfBoundsError); // abort
};
if config_data.len() > (u16::MAX as usize) {
eprintln!("WARNING: network config is larger than 65536 bytes!");
return Err(OutOfBoundsError); // abort
}
packet.append_u64(config.network_id.into())?;
packet.append_u16(config_data.len() as u16)?;
packet.append_bytes(config_data.as_slice())?;
packet.append_u64(config.network_id.into())?;
packet.append_u16(config_data.len() as u16)?;
packet.append_bytes(config_data.as_slice())?;
// TODO: for V1 we may need to introduce use of the chunking mechanism for large configs.
}
// TODO: for V1 we may need to introduce use of the chunking mechanism for large configs.
}
let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]);
packet.set_size(payload_start + new_payload_len);
let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]);
packet.set_size(payload_start + new_payload_len);
Ok(())
},
);
}
Ok(())
});
}
/// Send one or more revocation object(s) to a peer. The provided vector is drained.
fn send_revocations(&self, peer: &Peer, revocations: &mut Vec<Revocation>) {
fn send_revocations(&self, peer: &Peer<VL1Service<Self>>, revocations: &mut Vec<Revocation>) {
if let Some(host_system) = self.service.read().unwrap().upgrade() {
let time_ticks = ms_monotonic();
while !revocations.is_empty() {
@ -496,20 +491,12 @@ impl Controller {
}
impl InnerProtocolLayer for Controller {
#[inline(always)]
fn should_respond_to(&self, _: &Valid<Identity>) -> bool {
// Controllers always have to establish sessions to process requests. We don't really know if
// a member is relevant until we have looked up both the network and the member, since whether
// or not to "learn" unknown members is a network level option.
true
}
fn handle_packet<HostSystemImpl: ApplicationLayer + ?Sized>(
fn handle_packet<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
_: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
verb: u8,
@ -518,10 +505,6 @@ impl InnerProtocolLayer for Controller {
) -> PacketHandlerResult {
match verb {
protocol::message_type::VL2_NETWORK_CONFIG_REQUEST => {
if !self.should_respond_to(&source.identity) {
return PacketHandlerResult::Ok; // handled and ignored
}
let network_id = payload.read_u64(&mut cursor);
if network_id.is_err() {
return PacketHandlerResult::Error;
@ -533,7 +516,7 @@ impl InnerProtocolLayer for Controller {
let network_id = network_id.unwrap();
debug_event!(
host_system,
app,
"[vl2] NETWORK_CONFIG_REQUEST from {}({}) for {:0>16x}",
source.identity.address.to_string(),
source_path.endpoint.to_string(),
@ -565,7 +548,8 @@ impl InnerProtocolLayer for Controller {
let (result, config) = match self2.authorize(&source.identity, network_id, now).await {
Result::Ok((result, Some(config))) => {
//println!("{}", serde_yaml::to_string(&config).unwrap());
self2.send_network_config(source.as_ref(), &config, Some(message_id));
let app = self2.service.read().unwrap().upgrade().unwrap();
self2.send_network_config(app.as_ref(), app.node(), cast_ref(source.as_ref()).unwrap(), &config, Some(message_id));
(result, Some(config))
}
Result::Ok((result, None)) => (result, None),
@ -618,23 +602,21 @@ impl InnerProtocolLayer for Controller {
}
protocol::message_type::VL2_MULTICAST_GATHER => {
if let Some(service) = self.service.read().unwrap().upgrade() {
let auth = self.recently_authorized.read().unwrap();
let time_ticks = ms_monotonic();
self.multicast_authority.handle_vl2_multicast_gather(
|network_id, identity| {
auth.get(&identity.fingerprint)
.map_or(false, |t| t.get(&network_id).map_or(false, |t| *t > time_ticks))
},
time_ticks,
service.as_ref(),
service.node(),
source,
message_id,
payload,
cursor,
);
}
let auth = self.recently_authorized.read().unwrap();
let time_ticks = ms_monotonic();
self.multicast_authority.handle_vl2_multicast_gather(
|network_id, identity| {
auth.get(&identity.fingerprint)
.map_or(false, |t| t.get(&network_id).map_or(false, |t| *t > time_ticks))
},
time_ticks,
app,
node,
source,
message_id,
payload,
cursor,
);
PacketHandlerResult::Ok
}

View file

@ -25,13 +25,12 @@ use zerotier_utils::gate::IntervalGate;
use zerotier_utils::hex;
use zerotier_utils::marshalable::Marshalable;
use zerotier_utils::ringbuffer::RingBuffer;
use zerotier_utils::thing::Thing;
/// Interface trait to be implemented by code that's using the ZeroTier network hypervisor.
///
/// This is analogous to a C struct full of function pointers to callbacks along with some
/// associated type definitions.
pub trait ApplicationLayer: Sync + Send {
pub trait ApplicationLayer: Sync + Send + 'static {
/// Type for local system sockets.
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
@ -56,6 +55,9 @@ pub trait ApplicationLayer: Sync + Send {
/// unbound, etc.
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool;
/// Check if this node should respond to messages from a given peer at all.
fn should_respond_to(&self, id: &Valid<Identity>) -> bool;
/// Called to send a packet over the physical network (virtual -> physical).
///
/// This sends with UDP-like semantics. It should do whatever best effort it can and return.
@ -130,13 +132,6 @@ pub enum PacketHandlerResult {
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
#[allow(unused)]
pub trait InnerProtocolLayer: Sync + Send {
/// Check if this node should respond to messages from a given peer at all.
///
/// The default implementation always returns true.
fn should_respond_to(&self, id: &Valid<Identity>) -> bool {
true
}
/// Handle a packet, returning true if it was handled by the next layer.
///
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
@ -144,9 +139,9 @@ pub trait InnerProtocolLayer: Sync + Send {
fn handle_packet<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
verb: u8,
@ -161,9 +156,9 @@ pub trait InnerProtocolLayer: Sync + Send {
fn handle_error<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
@ -180,9 +175,9 @@ pub trait InnerProtocolLayer: Sync + Send {
fn handle_ok<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
@ -194,12 +189,12 @@ pub trait InnerProtocolLayer: Sync + Send {
}
}
struct RootInfo {
struct RootInfo<Application: ApplicationLayer + ?Sized> {
/// Root sets to which we are a member.
sets: HashMap<String, Verified<RootSet>>,
/// Root peers and their statically defined endpoints (from root sets).
roots: HashMap<Arc<Peer>, Vec<Endpoint>>,
roots: HashMap<Arc<Peer<Application>>, Vec<Endpoint>>,
/// If this node is a root, these are the root sets to which it's a member in binary serialized form.
/// Set to None if this node is not a root, meaning it doesn't appear in any of its root sets.
@ -225,17 +220,15 @@ struct BackgroundTaskIntervals {
whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
}
struct WhoisQueueItem {
v1_proto_waiting_packets: RingBuffer<(Weak<Path>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>,
struct WhoisQueueItem<Application: ApplicationLayer + ?Sized> {
v1_proto_waiting_packets:
RingBuffer<(Weak<Path<Application::LocalSocket, Application::LocalInterface>>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>,
last_retry_time: i64,
retry_count: u16,
}
const PATH_MAP_SIZE: usize = std::mem::size_of::<HashMap<[u8; std::mem::size_of::<Endpoint>() + 128], Arc<Path>>>();
type PathMap<LocalSocket> = HashMap<PathKey<'static, 'static, LocalSocket>, Arc<Path>>;
/// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network.
pub struct Node {
pub struct Node<Application: ApplicationLayer + ?Sized> {
/// A random ID generated to identify this particular running instance.
pub instance_id: [u8; 16],
@ -246,27 +239,23 @@ pub struct Node {
intervals: Mutex<BackgroundTaskIntervals>,
/// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use.
paths: RwLock<Thing<PATH_MAP_SIZE>>, // holds a PathMap<> but as a Thing<> to hide ApplicationLayer template parameter
paths: RwLock<HashMap<PathKey<'static, 'static, Application::LocalSocket>, Arc<Path<Application::LocalSocket, Application::LocalInterface>>>>,
/// Peers with which we are currently communicating.
peers: RwLock<HashMap<Address, Arc<Peer>>>,
peers: RwLock<HashMap<Address, Arc<Peer<Application>>>>,
/// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions.
roots: RwLock<RootInfo>,
roots: RwLock<RootInfo<Application>>,
/// Current best root.
best_root: RwLock<Option<Arc<Peer>>>,
best_root: RwLock<Option<Arc<Peer<Application>>>>,
/// Queue of identities being looked up.
whois_queue: Mutex<HashMap<Address, WhoisQueueItem>>,
whois_queue: Mutex<HashMap<Address, WhoisQueueItem<Application>>>,
}
impl Node {
pub fn new<Application: ApplicationLayer + ?Sized>(
app: &Application,
auto_generate_identity: bool,
auto_upgrade_identity: bool,
) -> Result<Self, InvalidParameterError> {
impl<Application: ApplicationLayer + ?Sized> Node<Application> {
pub fn new(app: &Application, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
let mut id = {
let id = app.load_node_identity();
if id.is_none() {
@ -297,7 +286,7 @@ impl Node {
instance_id: random::get_bytes_secure(),
identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: RwLock::new(Thing::new(PathMap::<Application::LocalSocket>::new())),
paths: RwLock::new(HashMap::new()),
peers: RwLock::new(HashMap::new()),
roots: RwLock::new(RootInfo {
sets: HashMap::new(),
@ -312,7 +301,7 @@ impl Node {
}
#[inline]
pub fn peer(&self, a: Address) -> Option<Arc<Peer>> {
pub fn peer(&self, a: Address) -> Option<Arc<Peer<Application>>> {
self.peers.read().unwrap().get(&a).cloned()
}
@ -323,13 +312,13 @@ impl Node {
/// Get the current "best" root from among this node's trusted roots.
#[inline]
pub fn best_root(&self) -> Option<Arc<Peer>> {
pub fn best_root(&self) -> Option<Arc<Peer<Application>>> {
self.best_root.read().unwrap().clone()
}
/// Check whether a peer is a root according to any root set trusted by this node.
#[inline]
pub fn is_peer_root(&self, peer: &Peer) -> bool {
pub fn is_peer_root(&self, peer: &Peer<Application>) -> bool {
self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity))
}
@ -380,7 +369,7 @@ impl Node {
self.roots.read().unwrap().sets.values().cloned().map(|s| s.remove_typestate()).collect()
}
pub fn do_background_tasks<Application: ApplicationLayer + ?Sized>(&self, app: &Application) -> Duration {
pub fn do_background_tasks(&self, app: &Application) -> Duration {
const INTERVAL_MS: i64 = 1000;
const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64);
let time_ticks = app.time_ticks();
@ -611,7 +600,7 @@ impl Node {
let mut need_keepalive = Vec::new();
// First check all paths in read mode to avoid blocking the entire node.
for (k, path) in self.paths.read().unwrap().get::<PathMap<Application::LocalSocket>>().iter() {
for (k, path) in self.paths.read().unwrap().iter() {
if app.local_socket_is_valid(k.local_socket()) {
match path.service(time_ticks) {
PathServiceResult::Ok => {}
@ -625,19 +614,13 @@ impl Node {
// Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking.
for dp in dead_paths.iter() {
self.paths.write().unwrap().get_mut::<PathMap<Application::LocalSocket>>().remove(dp);
self.paths.write().unwrap().remove(dp);
}
// Finally run keepalive sends as a batch.
let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance
for p in need_keepalive.iter() {
app.wire_send(
&p.endpoint,
Some(p.local_socket::<Application>()),
Some(p.local_interface::<Application>()),
&keepalive_buf,
0,
);
app.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &keepalive_buf, 0);
}
}
@ -663,7 +646,7 @@ impl Node {
INTERVAL
}
pub fn handle_incoming_physical_packet<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
pub fn handle_incoming_physical_packet<Inner: InnerProtocolLayer + ?Sized>(
&self,
app: &Application,
inner: &Inner,
@ -685,14 +668,7 @@ impl Node {
source_local_interface.to_string()
);
// An 0xff value at byte [8] means this is a ZSSP packet. This is accomplished via the
// backward compatibility hack of always having 0xff at byte [4] of 6-byte session IDs
// and by having 0xffffffffffff be the "nil" session ID for session init packets. ZSSP
// is the new V2 Noise-based forward-secure transport protocol. What follows below this
// is legacy handling of the old v1 protocol.
if packet.u8_at(8).map_or(false, |x| x == 0xff) {
todo!();
}
// TODO: detect inbound ZSSP sessions, handle ZSSP mode.
// Legacy ZeroTier V1 packet handling
if let Ok(fragment_header) = packet.struct_mut_at::<v1::FragmentHeader>(0) {
@ -701,7 +677,7 @@ impl Node {
if dest == self.identity.address {
let fragment_header = &*fragment_header; // discard mut
let path = self.canonical_path::<Application>(source_endpoint, source_local_socket, source_local_interface, time_ticks);
let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks);
path.log_receive_anything(time_ticks);
if fragment_header.is_fragment() {
@ -816,8 +792,8 @@ impl Node {
if let Some(forward_path) = peer.direct_path() {
app.wire_send(
&forward_path.endpoint,
Some(forward_path.local_socket::<Application>()),
Some(forward_path.local_interface::<Application>()),
Some(&forward_path.local_socket),
Some(&forward_path.local_interface),
packet.as_bytes(),
0,
);
@ -834,11 +810,11 @@ impl Node {
}
/// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply.
fn whois<Application: ApplicationLayer + ?Sized>(
fn whois(
&self,
app: &Application,
address: Address,
waiting_packet: Option<(Weak<Path>, PooledPacketBuffer)>,
waiting_packet: Option<(Weak<Path<Application::LocalSocket, Application::LocalInterface>>, PooledPacketBuffer)>,
time_ticks: i64,
) {
{
@ -862,7 +838,7 @@ impl Node {
}
/// Send a WHOIS query to the current best root.
fn send_whois<Application: ApplicationLayer + ?Sized>(&self, app: &Application, mut addresses: &[Address], time_ticks: i64) {
fn send_whois(&self, app: &Application, mut addresses: &[Address], time_ticks: i64) {
debug_assert!(!addresses.is_empty());
debug_event!(app, "[vl1] [v1] sending WHOIS for {}", {
let mut tmp = String::new();
@ -894,7 +870,7 @@ impl Node {
}
/// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS).
pub(crate) fn handle_incoming_identity<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
pub(crate) fn handle_incoming_identity<Inner: InnerProtocolLayer + ?Sized>(
&self,
app: &Application,
inner: &Inner,
@ -907,7 +883,7 @@ impl Node {
let mut whois_queue = self.whois_queue.lock().unwrap();
if let Some(qi) = whois_queue.get_mut(&received_identity.address) {
let address = received_identity.address;
if inner.should_respond_to(&received_identity) {
if app.should_respond_to(&received_identity) {
let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get(&address).cloned().or_else(|| {
Peer::new(&self.identity, received_identity, time_ticks)
@ -946,31 +922,23 @@ impl Node {
}
/// Get the canonical Path object corresponding to an endpoint.
pub(crate) fn canonical_path<Application: ApplicationLayer + ?Sized>(
pub(crate) fn canonical_path(
&self,
ep: &Endpoint,
local_socket: &Application::LocalSocket,
local_interface: &Application::LocalInterface,
time_ticks: i64,
) -> Arc<Path> {
) -> Arc<Path<Application::LocalSocket, Application::LocalInterface>> {
let paths = self.paths.read().unwrap();
if let Some(path) = paths.get::<PathMap<Application::LocalSocket>>().get(&PathKey::Ref(ep, local_socket)) {
if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) {
path.clone()
} else {
drop(paths);
self.paths
.write()
.unwrap()
.get_mut::<PathMap<Application::LocalSocket>>()
.entry(PathKey::Copied(ep.clone(), local_socket.clone()))
.or_insert_with(|| {
Arc::new(Path::new::<Application>(
ep.clone(),
local_socket.clone(),
local_interface.clone(),
time_ticks,
))
})
.or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks)))
.clone()
}
}

View file

@ -7,10 +7,8 @@ use std::sync::Mutex;
use crate::protocol;
use crate::vl1::endpoint::Endpoint;
use crate::vl1::node::*;
use zerotier_crypto::random;
use zerotier_utils::thing::Thing;
use zerotier_utils::NEVER_HAPPENED_TICKS;
pub(crate) const SERVICE_INTERVAL_MS: i64 = protocol::PATH_KEEPALIVE_INTERVAL;
@ -26,27 +24,22 @@ pub(crate) enum PathServiceResult {
/// These are maintained in Node and canonicalized so that all unique paths have
/// one and only one unique path object. That enables statistics to be tracked
/// for them and uniform application of things like keepalives.
pub struct Path {
pub struct Path<LocalSocket, LocalInterface> {
pub endpoint: Endpoint,
local_socket: Thing<64>,
local_interface: Thing<64>,
pub local_socket: LocalSocket,
pub local_interface: LocalInterface,
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<u64, protocol::v1::FragmentedPacket, PacketIdHasher>>,
}
impl Path {
pub(crate) fn new<Application: ApplicationLayer + ?Sized>(
endpoint: Endpoint,
local_socket: Application::LocalSocket,
local_interface: Application::LocalInterface,
time_ticks: i64,
) -> Self {
impl<LocalSocket, LocalInterface> Path<LocalSocket, LocalInterface> {
pub(crate) fn new(endpoint: Endpoint, local_socket: LocalSocket, local_interface: LocalInterface, time_ticks: i64) -> Self {
Self {
endpoint,
local_socket: Thing::new(local_socket), // enlarge Thing<> if this panics
local_interface: Thing::new(local_interface), // enlarge Thing<> if this panics
local_socket,
local_interface,
last_send_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS),
last_receive_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS),
create_time_ticks: time_ticks,
@ -54,16 +47,6 @@ impl Path {
}
}
#[inline(always)]
pub(crate) fn local_socket<Application: ApplicationLayer + ?Sized>(&self) -> &Application::LocalSocket {
self.local_socket.get()
}
#[inline(always)]
pub(crate) fn local_interface<Application: ApplicationLayer + ?Sized>(&self) -> &Application::LocalInterface {
self.local_interface.get()
}
/// Receive a fragment and return a FragmentedPacket if the entire packet was assembled.
/// This returns None if more fragments are needed to assemble the packet.
pub(crate) fn v1_proto_receive_fragment(

View file

@ -24,11 +24,11 @@ use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000;
pub struct Peer {
pub struct Peer<Application: ApplicationLayer + ?Sized> {
pub identity: Valid<Identity>,
v1_proto_static_secret: v1::SymmetricSecret,
paths: Mutex<Vec<PeerPath>>,
paths: Mutex<Vec<PeerPath<Application>>>,
pub(crate) last_send_time_ticks: AtomicI64,
pub(crate) last_receive_time_ticks: AtomicI64,
@ -41,8 +41,8 @@ pub struct Peer {
remote_node_info: RwLock<RemoteNodeInfo>,
}
struct PeerPath {
path: Weak<Path>,
struct PeerPath<Application: ApplicationLayer + ?Sized> {
path: Weak<Path<Application::LocalSocket, Application::LocalInterface>>,
last_receive_time_ticks: i64,
}
@ -53,11 +53,11 @@ struct RemoteNodeInfo {
}
/// Sort a list of paths by quality or priority, with best paths first.
fn prioritize_paths<Application: ApplicationLayer + ?Sized>(paths: &mut Vec<PeerPath>) {
fn prioritize_paths<Application: ApplicationLayer + ?Sized>(paths: &mut Vec<PeerPath<Application>>) {
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
}
impl Peer {
impl<Application: ApplicationLayer + ?Sized> Peer<Application> {
/// Create a new peer.
///
/// This only returns None if this_node_identity does not have its secrets or if some
@ -113,7 +113,7 @@ impl Peer {
/// Get current best path or None if there are no direct paths to this peer.
#[inline]
pub fn direct_path(&self) -> Option<Arc<Path>> {
pub fn direct_path(&self) -> Option<Arc<Path<Application::LocalSocket, Application::LocalInterface>>> {
for p in self.paths.lock().unwrap().iter() {
let pp = p.path.upgrade();
if pp.is_some() {
@ -125,7 +125,7 @@ impl Peer {
/// Get either the current best direct path or an indirect path via e.g. a root.
#[inline]
pub fn path(&self, node: &Node) -> Option<Arc<Path>> {
pub fn path(&self, node: &Node<Application>) -> Option<Arc<Path<Application::LocalSocket, Application::LocalInterface>>> {
let direct_path = self.direct_path();
if direct_path.is_some() {
return direct_path;
@ -136,7 +136,7 @@ impl Peer {
return None;
}
fn learn_path<Application: ApplicationLayer + ?Sized>(&self, app: &Application, new_path: &Arc<Path>, time_ticks: i64) {
fn learn_path(&self, app: &Application, new_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>, time_ticks: i64) {
let mut paths = self.paths.lock().unwrap();
// TODO: check path filter
@ -202,7 +202,7 @@ impl Peer {
}
/// Called every SERVICE_INTERVAL_MS by the background service loop in Node.
pub(crate) fn service<Application: ApplicationLayer + ?Sized>(&self, _: &Application, _: &Node, time_ticks: i64) -> bool {
pub(crate) fn service(&self, _: &Application, _: &Node<Application>, time_ticks: i64) -> bool {
// Prune dead paths and sort in descending order of quality.
{
let mut paths = self.paths.lock().unwrap();
@ -223,7 +223,7 @@ impl Peer {
}
/// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed.
fn v1_proto_internal_send<Application: ApplicationLayer + ?Sized>(
fn v1_proto_internal_send(
&self,
app: &Application,
endpoint: &Endpoint,
@ -281,11 +281,11 @@ impl Peer {
/// The builder function must append the verb (with any verb flags) and packet payload. If it returns
/// an error, the error is returned immediately and the send is aborted. None is returned if the send
/// function itself fails for some reason such as no paths being available.
pub fn send<Application: ApplicationLayer + ?Sized, R, E, BuilderFunction: FnOnce(&mut PacketBuffer) -> Result<R, E>>(
pub fn send<R, E, BuilderFunction: FnOnce(&mut PacketBuffer) -> Result<R, E>>(
&self,
app: &Application,
node: &Node,
path: Option<&Arc<Path>>,
node: &Node<Application>,
path: Option<&Arc<Path<Application::LocalSocket, Application::LocalInterface>>>,
time_ticks: i64,
builder_function: BuilderFunction,
) -> Option<Result<R, E>> {
@ -369,8 +369,8 @@ impl Peer {
self.v1_proto_internal_send(
app,
&path.endpoint,
Some(path.local_socket::<Application>()),
Some(path.local_interface::<Application>()),
Some(&path.local_socket),
Some(&path.local_interface),
max_fragment_size,
packet,
);
@ -389,12 +389,7 @@ impl Peer {
/// Unlike other messages HELLO is sent partially in the clear and always with the long-lived
/// static identity key. Authentication in old versions is via Poly1305 and in new versions
/// via HMAC-SHA512.
pub(crate) fn send_hello<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node,
explicit_endpoint: Option<&Endpoint>,
) -> bool {
pub(crate) fn send_hello(&self, app: &Application, node: &Node<Application>, explicit_endpoint: Option<&Endpoint>) -> bool {
let mut path = None;
let destination = if let Some(explicit_endpoint) = explicit_endpoint {
explicit_endpoint
@ -454,8 +449,8 @@ impl Peer {
self.v1_proto_internal_send(
app,
destination,
Some(p.local_socket::<Application>()),
Some(p.local_interface::<Application>()),
Some(&p.local_socket),
Some(&p.local_interface),
max_fragment_size,
packet,
);
@ -473,13 +468,13 @@ impl Peer {
/// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) fn v1_proto_receive<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
pub(crate) fn v1_proto_receive<Inner: InnerProtocolLayer + ?Sized>(
self: &Arc<Self>,
node: &Node,
node: &Node<Application>,
app: &Application,
inner: &Inner,
time_ticks: i64,
source_path: &Arc<Path>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
packet_header: &v1::PacketHeader,
frag0: &PacketBuffer,
fragments: &[Option<PooledPacketBuffer>],
@ -539,7 +534,7 @@ impl Peer {
return match verb {
message_type::VL1_NOP => PacketHandlerResult::Ok,
message_type::VL1_HELLO => self.handle_incoming_hello(app, inner, node, time_ticks, message_id, source_path, &payload),
message_type::VL1_HELLO => self.handle_incoming_hello(app, node, time_ticks, message_id, source_path, &payload),
message_type::VL1_ERROR => {
self.handle_incoming_error(app, inner, node, time_ticks, source_path, packet_header.hops(), message_id, &payload)
}
@ -554,9 +549,9 @@ impl Peer {
path_is_known,
&payload,
),
message_type::VL1_WHOIS => self.handle_incoming_whois(app, inner, node, time_ticks, message_id, &payload),
message_type::VL1_WHOIS => self.handle_incoming_whois(app, node, time_ticks, message_id, &payload),
message_type::VL1_RENDEZVOUS => self.handle_incoming_rendezvous(app, node, time_ticks, message_id, source_path, &payload),
message_type::VL1_ECHO => self.handle_incoming_echo(app, inner, node, time_ticks, message_id, &payload),
message_type::VL1_ECHO => self.handle_incoming_echo(app, node, time_ticks, message_id, &payload),
message_type::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(app, node, time_ticks, source_path, &payload),
message_type::VL1_USER_MESSAGE => self.handle_incoming_user_message(app, node, time_ticks, source_path, &payload),
_ => inner.handle_packet(app, node, self, &source_path, packet_header.hops(), message_id, verb, &payload, 1),
@ -567,17 +562,16 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_hello<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
fn handle_incoming_hello(
&self,
app: &Application,
inner: &Inner,
node: &Node,
node: &Node<Application>,
time_ticks: i64,
message_id: MessageId,
source_path: &Arc<Path>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if !(inner.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
if !(app.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
debug_event!(
app,
"[vl1] dropping HELLO from {} due to lack of trust relationship",
@ -621,13 +615,13 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_error<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
fn handle_incoming_error<Inner: InnerProtocolLayer + ?Sized>(
self: &Arc<Self>,
app: &Application,
inner: &Inner,
node: &Node,
node: &Node<Application>,
_time_ticks: i64,
source_path: &Arc<Path>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
payload: &PacketBuffer,
@ -659,13 +653,13 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_ok<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
fn handle_incoming_ok<Inner: InnerProtocolLayer + ?Sized>(
self: &Arc<Self>,
app: &Application,
inner: &Inner,
node: &Node,
node: &Node<Application>,
time_ticks: i64,
source_path: &Arc<Path>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
path_is_known: bool,
@ -761,16 +755,15 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_whois<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
fn handle_incoming_whois(
self: &Arc<Self>,
app: &Application,
inner: &Inner,
node: &Node,
node: &Node<Application>,
time_ticks: i64,
_message_id: MessageId,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if node.this_node_is_root() || inner.should_respond_to(&self.identity) {
if node.this_node_is_root() || app.should_respond_to(&self.identity) {
let mut addresses = payload.as_bytes();
while addresses.len() >= ADDRESS_SIZE {
if !self
@ -794,29 +787,28 @@ impl Peer {
return PacketHandlerResult::Ok;
}
fn handle_incoming_rendezvous<Application: ApplicationLayer + ?Sized>(
fn handle_incoming_rendezvous(
self: &Arc<Self>,
_app: &Application,
node: &Node,
node: &Node<Application>,
_time_ticks: i64,
_message_id: MessageId,
_source_path: &Arc<Path>,
_source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
_payload: &PacketBuffer,
) -> PacketHandlerResult {
if node.is_peer_root(self) {}
return PacketHandlerResult::Ok;
}
fn handle_incoming_echo<Application: ApplicationLayer + ?Sized, Inner: InnerProtocolLayer + ?Sized>(
fn handle_incoming_echo(
&self,
app: &Application,
inner: &Inner,
node: &Node,
node: &Node<Application>,
time_ticks: i64,
message_id: MessageId,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if inner.should_respond_to(&self.identity) || node.is_peer_root(self) {
if app.should_respond_to(&self.identity) || node.is_peer_root(self) {
self.send(app, node, None, time_ticks, |packet| {
let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap();
f.verb = message_type::VL1_OK;
@ -834,44 +826,44 @@ impl Peer {
return PacketHandlerResult::Ok;
}
fn handle_incoming_push_direct_paths<Application: ApplicationLayer + ?Sized>(
fn handle_incoming_push_direct_paths(
self: &Arc<Self>,
_app: &Application,
_node: &Node,
_node: &Node<Application>,
_time_ticks: i64,
_source_path: &Arc<Path>,
_source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
_payload: &PacketBuffer,
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
fn handle_incoming_user_message<Application: ApplicationLayer + ?Sized>(
fn handle_incoming_user_message(
self: &Arc<Self>,
_app: &Application,
_node: &Node,
_node: &Node<Application>,
_time_ticks: i64,
_source_path: &Arc<Path>,
_source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
_payload: &PacketBuffer,
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
}
impl Hash for Peer {
impl<Application: ApplicationLayer + ?Sized> Hash for Peer<Application> {
#[inline(always)]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
state.write_u64(self.identity.address.into());
}
}
impl PartialEq for Peer {
impl<Application: ApplicationLayer + ?Sized> PartialEq for Peer<Application> {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.identity.fingerprint.eq(&other.identity.fingerprint)
}
}
impl Eq for Peer {}
impl<Application: ApplicationLayer + ?Sized> Eq for Peer<Application> {}
fn v1_proto_try_aead_decrypt(
secret: &v1::SymmetricSecret,

View file

@ -3,19 +3,13 @@ use std::sync::{Arc, Mutex, RwLock};
use crate::protocol;
use crate::protocol::PacketBuffer;
use crate::vl1::{Address, ApplicationLayer, Identity, Node, PacketHandlerResult, Peer, MAC};
use crate::vl1::*;
use crate::vl2::{MulticastGroup, NetworkId};
use zerotier_utils::buffer::OutOfBoundsError;
use zerotier_utils::sync::RMaybeWLockGuard;
/// Handler implementations for VL2_MULTICAST_LIKE and VL2_MULTICAST_GATHER.
///
/// Both controllers and roots will want to handle these, with the latter supporting them for legacy
/// reasons only. Regular nodes may also want to handle them in the future. So, break this out to allow
/// easy code reuse. To integrate call the appropriate method when the appropriate message type is
/// received and pass in a function to check whether specific network/identity combinations should be
/// processed. The GATHER implementation will send reply packets to the source peer.
pub struct MulticastAuthority {
subscriptions: RwLock<HashMap<(NetworkId, MulticastGroup), Mutex<HashMap<Address, i64>>>>,
}
@ -47,11 +41,11 @@ impl MulticastAuthority {
}
/// Call for VL2_MULTICAST_LIKE packets.
pub fn handle_vl2_multicast_like<Authenticator: Fn(NetworkId, &Identity) -> bool>(
pub fn handle_vl2_multicast_like<Application: ApplicationLayer + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
&self,
auth: Authenticator,
time_ticks: i64,
source: &Arc<Peer>,
source: &Arc<Peer<Application>>,
payload: &PacketBuffer,
mut cursor: usize,
) -> PacketHandlerResult {
@ -84,13 +78,13 @@ impl MulticastAuthority {
}
/// Call for VL2_MULTICAST_GATHER packets.
pub fn handle_vl2_multicast_gather<HostSystemImpl: ApplicationLayer + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
pub fn handle_vl2_multicast_gather<Application: ApplicationLayer + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
&self,
auth: Authenticator,
time_ticks: i64,
host_system: &HostSystemImpl,
node: &Node,
source: &Arc<Peer>,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
message_id: u64,
payload: &PacketBuffer,
mut cursor: usize,
@ -114,7 +108,7 @@ impl MulticastAuthority {
}
while !gathered.is_empty() {
source.send(host_system, node, None, time_ticks, |packet| -> Result<(), OutOfBoundsError> {
source.send(app, node, None, time_ticks, |packet| -> Result<(), OutOfBoundsError> {
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
ok_header.verb = protocol::message_type::VL1_OK;
ok_header.in_re_verb = protocol::message_type::VL2_MULTICAST_GATHER;

View file

@ -14,9 +14,9 @@ impl InnerProtocolLayer for Switch {
fn handle_packet<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
verb: u8,
@ -29,9 +29,9 @@ impl InnerProtocolLayer for Switch {
fn handle_error<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
@ -46,9 +46,9 @@ impl InnerProtocolLayer for Switch {
fn handle_ok<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application::LocalSocket, Application::LocalInterface>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,

33
utils/src/cast.rs Normal file
View file

@ -0,0 +1,33 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c) ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::any::TypeId;
use std::mem::size_of;
#[inline(always)]
pub fn same_type<U: 'static, V: 'static>() -> bool {
TypeId::of::<U>() == TypeId::of::<V>() && size_of::<U>() == size_of::<V>()
}
#[inline(always)]
pub fn cast_ref<U: 'static, V: 'static>(u: &U) -> Option<&V> {
if same_type::<U, V>() {
Some(unsafe { std::mem::transmute::<&U, &V>(u) })
} else {
None
}
}
#[inline(always)]
pub fn cast_mut<U: 'static, V: 'static>(u: &mut U) -> Option<&mut V> {
if same_type::<U, V>() {
Some(unsafe { std::mem::transmute::<&mut U, &mut V>(u) })
} else {
None
}
}

View file

@ -33,36 +33,3 @@ impl<const FREQ: i64> IntervalGate<FREQ> {
}
}
}
/*
/// Boolean rate limiter with atomic semantics.
#[repr(transparent)]
pub struct AtomicIntervalGate<const FREQ: i64>(AtomicI64);
impl<const FREQ: i64> Default for AtomicIntervalGate<FREQ> {
#[inline(always)]
fn default() -> Self {
Self(AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS))
}
}
impl<const FREQ: i64> AtomicIntervalGate<FREQ> {
#[inline(always)]
#[allow(unused)]
pub fn new(initial_ts: i64) -> Self {
Self(AtomicI64::new(initial_ts))
}
#[inline(always)]
#[allow(unused)]
pub fn gate(&self, mut time: i64) -> bool {
let prev_time = self.0.load(Ordering::Acquire);
if (time - prev_time) < FREQ {
false
} else {
self.0.store(time, Ordering::Release);
true
}
}
}
*/

View file

@ -9,6 +9,7 @@
pub mod arrayvec;
pub mod blob;
pub mod buffer;
pub mod cast;
pub mod defer;
pub mod dictionary;
pub mod error;
@ -26,7 +27,6 @@ pub mod pool;
pub mod reaper;
pub mod ringbuffer;
pub mod sync;
pub mod thing;
pub mod varint;
#[cfg(feature = "tokio")]

View file

@ -37,7 +37,7 @@ pub struct VL1Service<Inner: InnerProtocolLayer + ?Sized + 'static> {
vl1_data_storage: Arc<dyn VL1DataStorage>,
inner: Arc<Inner>,
buffer_pool: Arc<PacketBufferPool>,
node_container: Option<Node>, // never None, set in new()
node_container: Option<Node<Self>>, // never None, set in new()
}
struct VL1ServiceMutableState {
@ -79,7 +79,7 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> VL1Service<Inner> {
}
#[inline(always)]
pub fn node(&self) -> &Node {
pub fn node(&self) -> &Node<Self> {
debug_assert!(self.node_container.is_some());
unsafe { self.node_container.as_ref().unwrap_unchecked() }
}
@ -216,6 +216,12 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> ApplicationLayer for VL1Servi
socket.is_valid()
}
#[inline]
fn should_respond_to(&self, _: &Valid<Identity>) -> bool {
// TODO: provide a way for the user of VL1Service to control this
true
}
#[inline]
fn load_node_identity(&self) -> Option<Valid<Identity>> {
self.vl1_data_storage.load_node_identity()