Split off VL1 API traits into a separate file, remove ?Sized from some traits as it is unnecessary and causes issues, cleanup.

This commit is contained in:
Adam Ierymenko 2023-03-28 08:58:39 -04:00
parent 65854aea5f
commit 1670a3aa31
12 changed files with 304 additions and 306 deletions

View file

@ -31,7 +31,6 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries.
pub struct Controller {
self_ref: Weak<Self>,
service: RwLock<Weak<VL1Service<Self>>>,
reaper: Reaper,
runtime: tokio::runtime::Handle,
database: Arc<dyn Database>,
@ -49,40 +48,25 @@ pub struct Controller {
}
impl Controller {
/*
/// Start an inner protocol handler answer ZeroTier VL2 network controller queries.
///
/// The start() method must be called once the service this will run within is also created.
pub async fn new(database: Arc<dyn Database>, runtime: tokio::runtime::Handle) -> Result<Arc<Self>, Box<dyn Error>> {
if let Some(local_identity) = database.load_node_identity() {
assert!(local_identity.secret.is_some());
Ok(Arc::new_cyclic(|r| Self {
self_ref: r.clone(),
service: RwLock::new(Weak::default()),
reaper: Reaper::new(&runtime),
runtime,
database: database.clone(),
multicast_authority: MulticastAuthority::new(),
daemons: Mutex::new(Vec::with_capacity(2)),
recently_authorized: RwLock::new(HashMap::new()),
}))
} else {
Err(Box::new(InvalidParameterError("local controller's identity not readable by database")))
}
}
pub async fn new(
runtime: tokio::runtime::Handle,
local_identity: IdentitySecret,
database: Arc<dyn Database>,
) -> Result<Arc<Self>, Box<dyn Error>> {
let c = Arc::new_cyclic(|self_ref| Self {
self_ref: self_ref.clone(),
reaper: Reaper::new(&runtime),
runtime,
database: database.clone(),
local_identity,
multicast_authority: MulticastAuthority::new(),
daemons: Mutex::new(Vec::with_capacity(2)),
recently_authorized: RwLock::new(HashMap::new()),
});
/// Set the service and HostSystem implementation for this controller and start daemons.
///
/// This must be called once the service that uses this handler is up or the controller
/// won't actually do anything. The controller holds a weak reference to VL1Service so
/// be sure it's not dropped.
pub async fn start(&self, service: &Arc<VL1Service<Self>>) {
*self.service.write().unwrap() = Arc::downgrade(service);
// Create database change listener.
if let Some(cw) = self.database.changes().await.map(|mut ch| {
let self2 = self.self_ref.clone();
self.runtime.spawn(async move {
if let Some(cw) = c.database.changes().await.map(|mut ch| {
let self2 = c.self_ref.clone();
c.runtime.spawn(async move {
loop {
if let Ok(change) = ch.recv().await {
if let Some(self2) = self2.upgrade() {
@ -97,12 +81,11 @@ impl Controller {
}
})
}) {
self.daemons.lock().unwrap().push(cw);
c.daemons.lock().unwrap().push(cw);
}
// Create background task to expire multicast subscriptions and recent authorizations.
let self2 = self.self_ref.clone();
self.daemons.lock().unwrap().push(self.runtime.spawn(async move {
let self2 = c.self_ref.clone();
c.daemons.lock().unwrap().push(c.runtime.spawn(async move {
let sleep_duration = Duration::from_millis((protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE / 2).min(2500) as u64);
loop {
tokio::time::sleep(sleep_duration).await;
@ -119,8 +102,9 @@ impl Controller {
}
}
}));
Ok(c)
}
*/
/// Launched as a task when the DB informs us of a change.
async fn handle_change_notification(self: Arc<Self>, change: Change) {
@ -139,7 +123,7 @@ impl Controller {
}
/// Compose and send network configuration packet (either V1 or V2)
fn send_network_config<Application: ApplicationLayer + ?Sized>(
fn send_network_config<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,
@ -176,8 +160,6 @@ impl Controller {
packet.append_u64(config.network_id.to_legacy_u64())?;
packet.append_u16(config_data.len() as u16)?;
packet.append_bytes(config_data.as_slice())?;
// TODO: for V1 we may need to introduce use of the chunking mechanism for large configs.
}
let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]);
@ -188,48 +170,40 @@ impl Controller {
}
/// Send one or more revocation object(s) to a peer. The provided vector is drained.
fn send_revocations(&self, peer: &Peer<VL1Service<Self>>, revocations: &mut Vec<Revocation>) {
if let Some(host_system) = self.service.read().unwrap().upgrade() {
let time_ticks = ms_monotonic();
while !revocations.is_empty() {
let send_count = revocations.len().min(protocol::UDP_DEFAULT_MTU / 256);
debug_assert!(send_count <= (u16::MAX as usize));
peer.send(
host_system.as_ref(),
&host_system.node,
None,
time_ticks,
|packet| -> Result<(), OutOfBoundsError> {
let payload_start = packet.len();
fn send_revocations(&self, app: &Arc<VL1Service<Self>>, peer: &Peer<VL1Service<Self>>, revocations: &mut Vec<Revocation>) {
let time_ticks = ms_monotonic();
while !revocations.is_empty() {
let send_count = revocations.len().min(protocol::UDP_DEFAULT_MTU / 256);
debug_assert!(send_count <= (u16::MAX as usize));
peer.send(app.as_ref(), &app.node, None, time_ticks, |packet| -> Result<(), OutOfBoundsError> {
let payload_start = packet.len();
packet.append_u8(protocol::message_type::VL2_NETWORK_CREDENTIALS)?;
packet.append_u8(0)?;
packet.append_u16(0)?;
packet.append_u16(0)?;
packet.append_u16(send_count as u16)?;
for _ in 0..send_count {
let r = revocations.pop().unwrap();
packet.append_bytes(r.v1_proto_to_bytes(&self.local_identity.public.address).as_bytes())?;
}
packet.append_u16(0)?;
packet.append_u8(protocol::message_type::VL2_NETWORK_CREDENTIALS)?;
packet.append_u8(0)?;
packet.append_u16(0)?;
packet.append_u16(0)?;
packet.append_u16(send_count as u16)?;
for _ in 0..send_count {
let r = revocations.pop().unwrap();
packet.append_bytes(r.v1_proto_to_bytes(&self.local_identity.public.address).as_bytes())?;
}
packet.append_u16(0)?;
let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]);
packet.set_size(payload_start + new_payload_len);
let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]);
packet.set_size(payload_start + new_payload_len);
Ok(())
},
);
}
Ok(())
});
}
}
async fn deauthorize_member(&self, member: &Member) {
async fn deauthorize_member(&self, app: &Arc<VL1Service<Self>>, member: &Member) {
let time_clock = ms_since_epoch();
let mut revocations = Vec::with_capacity(1);
if let Ok(all_network_members) = self.database.list_members(&member.network_id).await {
for m in all_network_members.iter() {
if member.node_id != *m {
if let Some(peer) = self.service.read().unwrap().upgrade().and_then(|s| s.node.peer(m)) {
if let Some(peer) = app.node.peer(m) {
revocations.clear();
revocations.push(Revocation::new(
&member.network_id,
@ -239,7 +213,7 @@ impl Controller {
&self.local_identity,
false,
));
self.send_revocations(&peer, &mut revocations);
self.send_revocations(&app, &peer, &mut revocations);
}
}
}
@ -258,7 +232,7 @@ impl Controller {
source_identity: &Valid<Identity>,
network_id: &NetworkId,
time_clock: i64,
) -> Result<(AuthenticationResult, Option<NetworkConfig>), Box<dyn Error + Send + Sync>> {
) -> Result<(AuthenticationResult, Option<Box<NetworkConfig>>), Box<dyn Error + Send + Sync>> {
let network = self.database.get_network(&network_id).await?;
if network.is_none() {
return Ok((AuthenticationResult::Rejected, None));
@ -326,7 +300,7 @@ impl Controller {
// Check and if necessary auto-assign static IPs for this member.
member_changed |= network.assign_ip_addresses(self.database.as_ref(), &mut member).await;
let mut nc = NetworkConfig::new(network_id.clone(), source_identity.address.clone());
let mut nc = Box::new(NetworkConfig::new(network_id.clone(), source_identity.address.clone()));
nc.name = network.name.clone();
nc.private = network.private;
@ -434,7 +408,7 @@ impl Controller {
}
impl InnerProtocolLayer for Controller {
fn handle_packet<Application: ApplicationLayer + ?Sized>(
fn handle_packet<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,
@ -481,6 +455,8 @@ impl InnerProtocolLayer for Controller {
};
// Launch handler as an async background task.
let app: &VL1Service<Self> = cast_ref(app).unwrap();
let app = app.get();
let (self2, source, source_remote_endpoint) = (self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone());
self.reaper.add(
self.runtime.spawn(async move {
@ -490,15 +466,13 @@ impl InnerProtocolLayer for Controller {
let (result, config) = match self2.authorize(&source.identity, &network_id, now).await {
Result::Ok((result, Some(config))) => {
//println!("{}", serde_yaml::to_string(&config).unwrap());
let app = self2.service.read().unwrap().upgrade().unwrap();
self2.send_network_config(app.as_ref(), &app.node, cast_ref(source.as_ref()).unwrap(), &config, Some(message_id));
(result, Some(config))
}
Result::Ok((result, None)) => (result, None),
Result::Err(e) => {
#[cfg(debug_assertions)]
let host = self2.service.read().unwrap().clone().upgrade().unwrap();
debug_event!(host, "[vl2] ERROR getting network config: {}", e.to_string());
debug_event!(app, "[vl2] ERROR getting network config: {}", e.to_string());
return;
}
};
@ -516,7 +490,6 @@ impl InnerProtocolLayer for Controller {
source_remote_endpoint,
source_hops,
result,
config,
})
.await;
}),

View file

@ -5,18 +5,19 @@ use std::sync::Arc;
use zerotier_network_controller::database::Database;
use zerotier_network_controller::filedatabase::FileDatabase;
use zerotier_network_controller::Controller;
use zerotier_network_hypervisor::vl1::identity::IdentitySecret;
use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
use zerotier_utils::exitcode;
use zerotier_utils::tokio::runtime::Runtime;
use zerotier_vl1_service::VL1Service;
async fn run(database: Arc<impl Database>, runtime: &Runtime) -> i32 {
async fn run(identity: IdentitySecret, runtime: &Runtime) -> i32 {
match Controller::new(database.clone(), runtime.handle().clone()).await {
Err(err) => {
eprintln!("FATAL: error initializing handler: {}", err.to_string());
exitcode::ERR_CONFIG
}
Ok(handler) => match VL1Service::new(database.clone(), handler.clone(), zerotier_vl1_service::VL1Settings::default()) {
Ok(handler) => match VL1Service::new(identity, handler.clone(), zerotier_vl1_service::VL1Settings::default()) {
Err(err) => {
eprintln!("FATAL: error launching service: {}", err.to_string());
exitcode::ERR_IOERR

View file

@ -11,7 +11,6 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::{Address, Endpoint};
use zerotier_network_hypervisor::vl2::v1::networkconfig::NetworkConfig;
use zerotier_network_hypervisor::vl2::NetworkId;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -118,9 +117,6 @@ pub struct RequestLogItem {
#[serde(rename = "r")]
pub result: AuthenticationResult,
#[serde(rename = "nc")]
pub config: Option<NetworkConfig>,
}
impl ToString for RequestLogItem {

View file

@ -0,0 +1,175 @@
use std::hash::Hash;
use std::sync::Arc;
use super::endpoint::Endpoint;
use super::event::Event;
use super::identity::Identity;
use super::node::Node;
use super::path::Path;
use super::peer::Peer;
use crate::protocol::{PacketBuffer, PooledPacketBuffer};
use zerotier_crypto::typestate::Valid;
/// Interface trait to be implemented by code that's using the ZeroTier network hypervisor.
///
/// This is analogous to a C struct full of function pointers to callbacks along with some
/// associated type definitions.
pub trait ApplicationLayer: Sync + Send + 'static {
/// Type for local system sockets.
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
/// Type for local system interfaces.
type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
/// A VL1 level event occurred.
fn event(&self, event: Event);
/// Get a pooled packet buffer for internal use.
fn get_buffer(&self) -> PooledPacketBuffer;
/// Check a local socket for validity.
///
/// This could return false if the socket's interface no longer exists, its port has been
/// unbound, etc.
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool;
/// Check if this node should respond to messages from a given peer at all.
///
/// The default implementation always returns true. Typically this is what you want for a
/// controller or a root but not a regular node (unless required for backward compatibility).
#[allow(unused)]
fn should_respond_to(&self, id: &Valid<Identity>) -> bool {
true
}
/// Called to send a packet over the physical network (virtual -> physical).
///
/// This sends with UDP-like semantics. It should do whatever best effort it can and return.
///
/// If a local socket is specified the implementation should send from that socket or not
/// at all (returning false). If a local interface is specified the implementation should
/// send from all sockets on that interface. If neither is specified the packet may be
/// sent on all sockets or a random subset.
///
/// For endpoint types that support a packet TTL, the implementation may set the TTL
/// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not
/// supported, the default TTL should be used. This parameter is ignored for types that
/// don't support it.
fn wire_send(
&self,
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
data: &[u8],
packet_ttl: u8,
);
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
///
/// The default implementation always returns true.
#[allow(unused_variables)]
fn should_use_physical_path<Application: ApplicationLayer>(
&self,
id: &Valid<Identity>,
endpoint: &Endpoint,
local_socket: Option<&Application::LocalSocket>,
local_interface: Option<&Application::LocalInterface>,
) -> bool {
true
}
/// Called to look up any statically defined or memorized paths to known nodes.
///
/// The default implementation always returns None.
#[allow(unused_variables)]
fn get_path_hints<Application: ApplicationLayer>(
&self,
id: &Valid<Identity>,
) -> Option<Vec<(Endpoint, Option<Application::LocalSocket>, Option<Application::LocalInterface>)>> {
None
}
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
/// This needs to be accurate to about 250 milliseconds resolution or better.
fn time_ticks(&self) -> i64;
/// Called to get the current time in milliseconds since epoch from the real-time clock.
/// This needs to be accurate to about one second resolution or better.
fn time_clock(&self) -> i64;
}
/// Result of a packet handler in the InnerProtocolLayer trait.
pub enum PacketHandlerResult {
/// Packet was handled successfully.
Ok,
/// Packet was handled and an error occurred (malformed, authentication failure, etc.)
Error,
/// Packet was not handled by this handler.
NotHandled,
}
/// Interface between VL1 and higher/inner protocol layers.
///
/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
#[allow(unused)]
pub trait InnerProtocolLayer: Sync + Send {
/// Handle a packet, returning true if it was handled by the next layer.
///
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
/// The default version returns NotHandled.
fn handle_packet<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application>>,
source_hops: u8,
message_id: u64,
verb: u8,
payload: &PacketBuffer,
cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
/// Handle errors, returning true if the error was recognized.
/// The default version returns NotHandled.
fn handle_error<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,
error_code: u8,
payload: &PacketBuffer,
cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
/// Handle an OK, returning true if the OK was recognized.
/// The default version returns NotHandled.
fn handle_ok<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,
payload: &PacketBuffer,
cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
}

View file

@ -1,6 +1,7 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently proprietary pending actual release and licensing. See LICENSE.md.
mod address;
mod api;
mod endpoint;
mod event;
mod mac;
@ -15,11 +16,12 @@ pub mod identity;
pub mod inetaddress;
pub use address::{Address, PartialAddress};
pub use api::{ApplicationLayer, InnerProtocolLayer, PacketHandlerResult};
pub use endpoint::Endpoint;
pub use event::Event;
pub use inetaddress::InetAddress;
pub use mac::MAC;
pub use node::{ApplicationLayer, InnerProtocolLayer, Node, PacketHandlerResult};
pub use node::Node;
pub use path::Path;
pub use peer::Peer;
pub use rootset::{Root, RootSet};

View file

@ -6,16 +6,17 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use super::address::{Address, PartialAddress};
use super::api::{ApplicationLayer, InnerProtocolLayer, PacketHandlerResult};
use super::debug_event;
use super::endpoint::Endpoint;
use super::event::Event;
use super::identity::{Identity, IdentitySecret};
use super::path::{Path, PathServiceResult};
use super::peer::Peer;
use super::peermap::PeerMap;
use super::rootset::RootSet;
use crate::protocol::*;
use crate::vl1::address::{Address, PartialAddress};
use crate::vl1::debug_event;
use crate::vl1::endpoint::Endpoint;
use crate::vl1::event::Event;
use crate::vl1::identity::{Identity, IdentitySecret};
use crate::vl1::path::{Path, PathServiceResult};
use crate::vl1::peer::Peer;
use crate::vl1::peermap::PeerMap;
use crate::vl1::rootset::RootSet;
use zerotier_crypto::typestate::{Valid, Verified};
use zerotier_utils::gate::IntervalGate;
@ -23,170 +24,22 @@ use zerotier_utils::hex;
use zerotier_utils::marshalable::Marshalable;
use zerotier_utils::tokio::io::AsyncWriteExt;
/// Interface trait to be implemented by code that's using the ZeroTier network hypervisor.
/// A VL1 node on the ZeroTier global peer to peer network.
///
/// This is analogous to a C struct full of function pointers to callbacks along with some
/// associated type definitions.
pub trait ApplicationLayer: Sync + Send + 'static {
/// Type for local system sockets.
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
/// Type for local system interfaces.
type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
/// A VL1 level event occurred.
fn event(&self, event: Event);
/// Get a pooled packet buffer for internal use.
fn get_buffer(&self) -> PooledPacketBuffer;
/// Check a local socket for validity.
///
/// This could return false if the socket's interface no longer exists, its port has been
/// unbound, etc.
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool;
/// Check if this node should respond to messages from a given peer at all.
///
/// The default implementation always returns true. Typically this is what you want for a
/// controller or a root but not a regular node (unless required for backward compatibility).
#[allow(unused)]
fn should_respond_to(&self, id: &Valid<Identity>) -> bool {
true
}
/// Called to send a packet over the physical network (virtual -> physical).
///
/// This sends with UDP-like semantics. It should do whatever best effort it can and return.
///
/// If a local socket is specified the implementation should send from that socket or not
/// at all (returning false). If a local interface is specified the implementation should
/// send from all sockets on that interface. If neither is specified the packet may be
/// sent on all sockets or a random subset.
///
/// For endpoint types that support a packet TTL, the implementation may set the TTL
/// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not
/// supported, the default TTL should be used. This parameter is ignored for types that
/// don't support it.
fn wire_send(
&self,
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
data: &[u8],
packet_ttl: u8,
);
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
///
/// The default implementation always returns true.
#[allow(unused_variables)]
fn should_use_physical_path<Application: ApplicationLayer + ?Sized>(
&self,
id: &Valid<Identity>,
endpoint: &Endpoint,
local_socket: Option<&Application::LocalSocket>,
local_interface: Option<&Application::LocalInterface>,
) -> bool {
true
}
/// Called to look up any statically defined or memorized paths to known nodes.
///
/// The default implementation always returns None.
#[allow(unused_variables)]
fn get_path_hints<Application: ApplicationLayer + ?Sized>(
&self,
id: &Valid<Identity>,
) -> Option<Vec<(Endpoint, Option<Application::LocalSocket>, Option<Application::LocalInterface>)>> {
None
}
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
/// This needs to be accurate to about 250 milliseconds resolution or better.
fn time_ticks(&self) -> i64;
/// Called to get the current time in milliseconds since epoch from the real-time clock.
/// This needs to be accurate to about one second resolution or better.
fn time_clock(&self) -> i64;
/// VL1 nodes communicate to/from both the outside world and the inner protocol layer via the two
/// supplied API traits that must be implemented by the application. ApplicationLayer provides a
/// means of interacting with the application/OS and InnerProtocolLayer provides the interface for
/// implementing the protocol (e.g. ZeroTier VL2) that will be carried by VL1.
pub struct Node<Application: ApplicationLayer> {
pub identity: IdentitySecret,
intervals: Mutex<BackgroundTaskIntervals>,
paths: RwLock<HashMap<PathKey<'static, 'static, Application::LocalSocket>, Arc<Path<Application>>>>,
pub(super) peers: PeerMap<Application>,
roots: RwLock<RootInfo<Application>>,
best_root: RwLock<Option<Arc<Peer<Application>>>>,
}
/// Result of a packet handler in the InnerProtocolLayer trait.
pub enum PacketHandlerResult {
/// Packet was handled successfully.
Ok,
/// Packet was handled and an error occurred (malformed, authentication failure, etc.)
Error,
/// Packet was not handled by this handler.
NotHandled,
}
/// Interface between VL1 and higher/inner protocol layers.
///
/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
#[allow(unused)]
pub trait InnerProtocolLayer: Sync + Send {
/// Handle a packet, returning true if it was handled by the next layer.
///
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
/// The default version returns NotHandled.
fn handle_packet<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application>>,
source_hops: u8,
message_id: u64,
verb: u8,
payload: &PacketBuffer,
cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
/// Handle errors, returning true if the error was recognized.
/// The default version returns NotHandled.
fn handle_error<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,
error_code: u8,
payload: &PacketBuffer,
cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
/// Handle an OK, returning true if the OK was recognized.
/// The default version returns NotHandled.
fn handle_ok<Application: ApplicationLayer + ?Sized>(
&self,
app: &Application,
node: &Node<Application>,
source: &Arc<Peer<Application>>,
source_path: &Arc<Path<Application>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,
payload: &PacketBuffer,
cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
}
struct RootInfo<Application: ApplicationLayer + ?Sized> {
struct RootInfo<Application: ApplicationLayer> {
/// Root sets to which we are a member.
sets: HashMap<String, Verified<RootSet>>,
@ -217,16 +70,7 @@ struct BackgroundTaskIntervals {
whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
}
pub struct Node<Application: ApplicationLayer + ?Sized> {
pub identity: IdentitySecret,
intervals: Mutex<BackgroundTaskIntervals>,
paths: RwLock<HashMap<PathKey<'static, 'static, Application::LocalSocket>, Arc<Path<Application>>>>,
pub(super) peers: PeerMap<Application>,
roots: RwLock<RootInfo<Application>>,
best_root: RwLock<Option<Arc<Peer<Application>>>>,
}
impl<Application: ApplicationLayer + ?Sized> Node<Application> {
impl<Application: ApplicationLayer> Node<Application> {
pub fn new(identity_secret: IdentitySecret) -> Self {
Self {
identity: identity_secret,
@ -550,7 +394,7 @@ impl<Application: ApplicationLayer + ?Sized> Node<Application> {
INTERVAL
}
pub fn handle_incoming_physical_packet<Inner: InnerProtocolLayer + ?Sized>(
pub fn handle_incoming_physical_packet<Inner: InnerProtocolLayer>(
&self,
app: &Application,
inner: &Inner,
@ -753,7 +597,7 @@ impl<Application: ApplicationLayer + ?Sized> Node<Application> {
}
}
/// Key used to look up paths in a hash map efficiently.
/// Key used to look up paths in a hash map efficiently. It can be constructed for lookup without full copy.
enum PathKey<'a, 'b, LocalSocket: Hash + PartialEq + Eq + Clone> {
Copied(Endpoint, LocalSocket),
Ref(&'a Endpoint, &'b LocalSocket),

View file

@ -14,19 +14,20 @@ use zerotier_utils::marshalable::Marshalable;
use zerotier_utils::memory::array_range;
use zerotier_utils::NEVER_HAPPENED_TICKS;
use super::api::*;
use super::debug_event;
use super::identity::{Identity, IdentitySecret};
use super::node::*;
use super::Valid;
use super::{Address, Endpoint, Path};
use crate::protocol::*;
use crate::vl1::debug_event;
use crate::vl1::identity::{Identity, IdentitySecret};
use crate::vl1::node::*;
use crate::vl1::Valid;
use crate::vl1::{Address, Endpoint, Path};
use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
use super::PartialAddress;
pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000;
pub struct Peer<Application: ApplicationLayer + ?Sized> {
pub struct Peer<Application: ApplicationLayer> {
pub identity: Valid<Identity>,
v1_proto_static_secret: v1::SymmetricSecret,
@ -43,7 +44,7 @@ pub struct Peer<Application: ApplicationLayer + ?Sized> {
remote_node_info: RwLock<RemoteNodeInfo>,
}
struct PeerPath<Application: ApplicationLayer + ?Sized> {
struct PeerPath<Application: ApplicationLayer> {
path: Weak<Path<Application>>,
last_receive_time_ticks: i64,
}
@ -55,11 +56,11 @@ struct RemoteNodeInfo {
}
/// Sort a list of paths by quality or priority, with best paths first.
fn prioritize_paths<Application: ApplicationLayer + ?Sized>(paths: &mut Vec<PeerPath<Application>>) {
fn prioritize_paths<Application: ApplicationLayer>(paths: &mut Vec<PeerPath<Application>>) {
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
}
impl<Application: ApplicationLayer + ?Sized> Peer<Application> {
impl<Application: ApplicationLayer> Peer<Application> {
/// Create a new peer.
///
/// This only returns None if this_node_identity does not have its secrets or if some
@ -470,7 +471,7 @@ impl<Application: ApplicationLayer + ?Sized> Peer<Application> {
/// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) fn v1_proto_receive<Inner: InnerProtocolLayer + ?Sized>(
pub(crate) fn v1_proto_receive<Inner: InnerProtocolLayer>(
self: &Arc<Self>,
node: &Node<Application>,
app: &Application,
@ -617,7 +618,7 @@ impl<Application: ApplicationLayer + ?Sized> Peer<Application> {
return PacketHandlerResult::Error;
}
fn handle_incoming_error<Inner: InnerProtocolLayer + ?Sized>(
fn handle_incoming_error<Inner: InnerProtocolLayer>(
self: &Arc<Self>,
app: &Application,
inner: &Inner,
@ -655,7 +656,7 @@ impl<Application: ApplicationLayer + ?Sized> Peer<Application> {
return PacketHandlerResult::Error;
}
fn handle_incoming_ok<Inner: InnerProtocolLayer + ?Sized>(
fn handle_incoming_ok<Inner: InnerProtocolLayer>(
self: &Arc<Self>,
app: &Application,
inner: &Inner,
@ -852,21 +853,21 @@ impl<Application: ApplicationLayer + ?Sized> Peer<Application> {
}
}
impl<Application: ApplicationLayer + ?Sized> Hash for Peer<Application> {
impl<Application: ApplicationLayer> Hash for Peer<Application> {
#[inline(always)]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.identity.address.hash(state)
}
}
impl<Application: ApplicationLayer + ?Sized> PartialEq for Peer<Application> {
impl<Application: ApplicationLayer> PartialEq for Peer<Application> {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.identity.eq(&other.identity)
}
}
impl<Application: ApplicationLayer + ?Sized> Eq for Peer<Application> {}
impl<Application: ApplicationLayer> Eq for Peer<Application> {}
fn v1_proto_try_aead_decrypt(
secret: &v1::SymmetricSecret,

View file

@ -3,17 +3,17 @@ use std::ops::Bound;
use std::sync::{Arc, RwLock};
use super::address::{Address, PartialAddress};
use super::api::ApplicationLayer;
use super::identity::{Identity, IdentitySecret};
use super::node::ApplicationLayer;
use super::peer::Peer;
use zerotier_crypto::typestate::Valid;
pub struct PeerMap<Application: ApplicationLayer + ?Sized> {
pub struct PeerMap<Application: ApplicationLayer> {
maps: [RwLock<BTreeMap<Address, Arc<Peer<Application>>>>; 256],
}
impl<Application: ApplicationLayer + ?Sized> PeerMap<Application> {
impl<Application: ApplicationLayer> PeerMap<Application> {
pub fn new() -> Self {
Self { maps: std::array::from_fn(|_| RwLock::new(BTreeMap::new())) }
}

View file

@ -4,8 +4,9 @@ use std::ops::Bound;
use std::sync::{Mutex, Weak};
use super::address::PartialAddress;
use super::api::{ApplicationLayer, InnerProtocolLayer};
use super::identity::Identity;
use super::node::{ApplicationLayer, InnerProtocolLayer, Node};
use super::node::Node;
use super::path::Path;
use crate::debug_event;
use crate::protocol;
@ -13,17 +14,17 @@ use crate::protocol;
use zerotier_crypto::typestate::Valid;
use zerotier_utils::ringbuffer::RingBuffer;
pub(super) struct Whois<Application: ApplicationLayer + ?Sized> {
pub(super) struct Whois<Application: ApplicationLayer> {
whois_queue: Mutex<BTreeMap<PartialAddress, WhoisQueueItem<Application>>>,
}
struct WhoisQueueItem<Application: ApplicationLayer + ?Sized> {
struct WhoisQueueItem<Application: ApplicationLayer> {
pending_v1_packets: RingBuffer<(Weak<Path<Application>>, protocol::PooledPacketBuffer), { protocol::WHOIS_MAX_WAITING_PACKETS }>,
last_retry_time: i64,
retry_count: u16,
}
impl<Application: ApplicationLayer + ?Sized> Whois<Application> {
impl<Application: ApplicationLayer> Whois<Application> {
pub fn new() -> Self {
Self { whois_queue: Mutex::new(BTreeMap::new()) }
}
@ -37,7 +38,7 @@ impl<Application: ApplicationLayer + ?Sized> Whois<Application> {
) {
}
pub fn handle_incoming_identity<Inner: InnerProtocolLayer + ?Sized>(
pub fn handle_incoming_identity<Inner: InnerProtocolLayer>(
&self,
app: &Application,
node: &Node<Application>,

View file

@ -42,7 +42,7 @@ impl MulticastAuthority {
}
/// Call for VL2_MULTICAST_LIKE packets.
pub fn handle_vl2_multicast_like<Application: ApplicationLayer + ?Sized, Authenticator: Fn(&NetworkId, &Identity) -> bool>(
pub fn handle_vl2_multicast_like<Application: ApplicationLayer, Authenticator: Fn(&NetworkId, &Identity) -> bool>(
&self,
auth: Authenticator,
time_ticks: i64,
@ -79,7 +79,7 @@ impl MulticastAuthority {
}
/// Call for VL2_MULTICAST_GATHER packets.
pub fn handle_vl2_multicast_gather<Application: ApplicationLayer + ?Sized, Authenticator: Fn(&NetworkId, &Identity) -> bool>(
pub fn handle_vl2_multicast_gather<Application: ApplicationLayer, Authenticator: Fn(&NetworkId, &Identity) -> bool>(
&self,
auth: Authenticator,
time_ticks: i64,

View file

@ -11,7 +11,7 @@ pub struct Switch {}
#[allow(unused_variables)]
impl InnerProtocolLayer for Switch {
fn handle_packet<Application: ApplicationLayer + ?Sized>(
fn handle_packet<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,
@ -26,7 +26,7 @@ impl InnerProtocolLayer for Switch {
PacketHandlerResult::NotHandled
}
fn handle_error<Application: ApplicationLayer + ?Sized>(
fn handle_error<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,
@ -43,7 +43,7 @@ impl InnerProtocolLayer for Switch {
PacketHandlerResult::NotHandled
}
fn handle_ok<Application: ApplicationLayer + ?Sized>(
fn handle_ok<Application: ApplicationLayer>(
&self,
app: &Application,
node: &Node<Application>,

View file

@ -2,8 +2,7 @@
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::{Arc, RwLock, Weak};
use std::thread::JoinHandle;
use std::time::Duration;
@ -27,8 +26,9 @@ const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10;
/// talks to the physical network, manages the vl1 node, and presents a templated interface for
/// whatever inner protocol implementation is using it. This would typically be VL2 but could be
/// a test harness or just the controller for a controller that runs stand-alone.
pub struct VL1Service<Inner: InnerProtocolLayer + ?Sized + 'static> {
pub struct VL1Service<Inner: InnerProtocolLayer + 'static> {
pub node: Node<Self>,
self_ref: Weak<Self>,
state: RwLock<VL1ServiceMutableState>,
inner: Arc<Inner>,
buffer_pool: Arc<PacketBufferPool>,
@ -41,10 +41,11 @@ struct VL1ServiceMutableState {
running: bool,
}
impl<Inner: InnerProtocolLayer + ?Sized + 'static> VL1Service<Inner> {
impl<Inner: InnerProtocolLayer + 'static> VL1Service<Inner> {
pub fn new(identity: IdentitySecret, inner: Arc<Inner>, settings: VL1Settings) -> Result<Arc<Self>, Box<dyn Error>> {
let service = Arc::new(Self {
let service = Arc::new_cyclic(|self_ref| Self {
node: Node::<Self>::new(identity),
self_ref: self_ref.clone(),
state: RwLock::new(VL1ServiceMutableState {
daemons: Vec::with_capacity(2),
udp_sockets: HashMap::with_capacity(8),
@ -68,6 +69,10 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> VL1Service<Inner> {
Ok(service)
}
pub fn get(&self) -> Arc<Self> {
self.self_ref.upgrade().unwrap()
}
pub fn bound_udp_ports(&self) -> Vec<u16> {
self.state.read().unwrap().udp_sockets.keys().cloned().collect()
}
@ -162,7 +167,7 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> VL1Service<Inner> {
}
}
impl<Inner: InnerProtocolLayer + ?Sized + 'static> UdpPacketHandler for VL1Service<Inner> {
impl<Inner: InnerProtocolLayer + 'static> UdpPacketHandler for VL1Service<Inner> {
#[inline(always)]
fn incoming_udp_packet(
self: &Arc<Self>,
@ -183,7 +188,7 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> UdpPacketHandler for VL1Servi
}
}
impl<Inner: InnerProtocolLayer + ?Sized + 'static> ApplicationLayer for VL1Service<Inner> {
impl<Inner: InnerProtocolLayer + 'static> ApplicationLayer for VL1Service<Inner> {
type LocalSocket = crate::LocalSocket;
type LocalInterface = crate::LocalInterface;
@ -280,7 +285,7 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> ApplicationLayer for VL1Servi
}
}
impl<Inner: InnerProtocolLayer + ?Sized + 'static> Drop for VL1Service<Inner> {
impl<Inner: InnerProtocolLayer + 'static> Drop for VL1Service<Inner> {
fn drop(&mut self) {
let mut state = self.state.write().unwrap();
state.running = false;