Tons of work: reorg traits to simplify, implement multicast authority stuff, other works in progress.

This commit is contained in:
Adam Ierymenko 2022-11-03 11:11:04 -04:00
parent 3266064549
commit ee4ce6a8ef
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
16 changed files with 522 additions and 156 deletions

View file

@ -8,18 +8,17 @@ use tokio::time::{Duration, Instant};
use zerotier_network_hypervisor::protocol; use zerotier_network_hypervisor::protocol;
use zerotier_network_hypervisor::protocol::{PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU}; use zerotier_network_hypervisor::protocol::{PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU};
use zerotier_network_hypervisor::vl1::{ use zerotier_network_hypervisor::vl1::*;
debug_event, HostSystem, Identity, InnerProtocol, Node, PacketHandlerResult, Path, PathFilter, Peer,
};
use zerotier_network_hypervisor::vl2;
use zerotier_network_hypervisor::vl2::networkconfig::*; use zerotier_network_hypervisor::vl2::networkconfig::*;
use zerotier_network_hypervisor::vl2::v1::Revocation; use zerotier_network_hypervisor::vl2::v1::Revocation;
use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_network_hypervisor::vl2::{self, MulticastGroup};
use zerotier_utils::blob::Blob; use zerotier_utils::blob::Blob;
use zerotier_utils::buffer::OutOfBoundsError; use zerotier_utils::buffer::OutOfBoundsError;
use zerotier_utils::dictionary::Dictionary; use zerotier_utils::dictionary::Dictionary;
use zerotier_utils::error::InvalidParameterError; use zerotier_utils::error::InvalidParameterError;
use zerotier_utils::reaper::Reaper; use zerotier_utils::reaper::Reaper;
use zerotier_utils::sync::RMaybeWLockGuard;
use zerotier_utils::tokio; use zerotier_utils::tokio;
use zerotier_utils::{ms_monotonic, ms_since_epoch}; use zerotier_utils::{ms_monotonic, ms_since_epoch};
use zerotier_vl1_service::VL1Service; use zerotier_vl1_service::VL1Service;
@ -35,10 +34,19 @@ pub struct Controller {
self_ref: Weak<Self>, self_ref: Weak<Self>,
service: RwLock<Weak<VL1Service<dyn Database, Self, Self>>>, service: RwLock<Weak<VL1Service<dyn Database, Self, Self>>>,
reaper: Reaper, reaper: Reaper,
daemons: Mutex<Vec<tokio::task::JoinHandle<()>>>, // drop() aborts these
runtime: tokio::runtime::Handle, runtime: tokio::runtime::Handle,
database: Arc<dyn Database>, database: Arc<dyn Database>,
local_identity: Identity, local_identity: Identity,
// Async tasks that should be killed when the controller is dropped.
daemons: Mutex<Vec<tokio::task::JoinHandle<()>>>, // drop() aborts these
// Multicast "likes" recently received.
multicast_subscriptions: RwLock<HashMap<(NetworkId, MulticastGroup), Mutex<HashMap<Address, i64>>>>,
// Recently authorized network members and when that authorization expires (in monotonic ticks).
// Note that this is not and should not be used for real authentication, just for locking up multicast info.
recently_authorized: RwLock<HashMap<(NetworkId, [u8; Identity::FINGERPRINT_SIZE]), i64>>,
} }
impl Controller { impl Controller {
@ -52,10 +60,12 @@ impl Controller {
self_ref: r.clone(), self_ref: r.clone(),
service: RwLock::new(Weak::default()), service: RwLock::new(Weak::default()),
reaper: Reaper::new(&runtime), reaper: Reaper::new(&runtime),
daemons: Mutex::new(Vec::with_capacity(1)),
runtime, runtime,
database: database.clone(), database: database.clone(),
local_identity, local_identity,
daemons: Mutex::new(Vec::with_capacity(2)),
multicast_subscriptions: RwLock::new(HashMap::new()),
recently_authorized: RwLock::new(HashMap::new()),
})) }))
} else { } else {
Err(Box::new(InvalidParameterError( Err(Box::new(InvalidParameterError(
@ -73,21 +83,63 @@ impl Controller {
pub async fn start(&self, service: &Arc<VL1Service<dyn Database, Self, Self>>) { pub async fn start(&self, service: &Arc<VL1Service<dyn Database, Self, Self>>) {
*self.service.write().unwrap() = Arc::downgrade(service); *self.service.write().unwrap() = Arc::downgrade(service);
// Create database change listener.
if let Some(cw) = self.database.changes().await.map(|mut ch| { if let Some(cw) = self.database.changes().await.map(|mut ch| {
let self2 = self.self_ref.upgrade().unwrap(); let self2 = self.self_ref.clone();
self.runtime.spawn(async move { self.runtime.spawn(async move {
loop { loop {
if let Ok(change) = ch.recv().await { if let Ok(change) = ch.recv().await {
self2.reaper.add( if let Some(self2) = self2.upgrade() {
self2.runtime.spawn(self2.clone().handle_change_notification(change)), self2.reaper.add(
Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), self2.runtime.spawn(self2.clone().handle_change_notification(change)),
); Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(),
);
} else {
break;
}
} }
} }
}) })
}) { }) {
self.daemons.lock().unwrap().push(cw); self.daemons.lock().unwrap().push(cw);
} }
// Create background task to expire multicast subscriptions and recent authorizations.
let self2 = self.self_ref.clone();
self.daemons.lock().unwrap().push(self.runtime.spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis((protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE / 2) as u64)).await;
if let Some(self2) = self2.upgrade() {
let time_ticks = ms_monotonic();
let exp_before = time_ticks - protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE;
let mut empty_subscription_entries = Vec::new();
for (network_group, subs) in self2.multicast_subscriptions.read().unwrap().iter() {
let mut subs = subs.lock().unwrap();
subs.retain(|_, t| *t > exp_before);
if subs.is_empty() {
empty_subscription_entries.push(network_group.clone());
}
}
if !empty_subscription_entries.is_empty() {
let mut ms = self2.multicast_subscriptions.write().unwrap();
for e in empty_subscription_entries.iter() {
ms.remove(e);
}
}
self2
.recently_authorized
.write()
.unwrap()
.retain(|_, timeout| *timeout > time_ticks);
} else {
break;
}
}
}));
} }
/// Compose and send network configuration packet. /// Compose and send network configuration packet.
@ -108,11 +160,11 @@ impl Controller {
if let Some(in_re_message_id) = in_re_message_id { if let Some(in_re_message_id) = in_re_message_id {
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?; let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
ok_header.verb = protocol::verbs::VL1_OK; ok_header.verb = protocol::message_type::VL1_OK;
ok_header.in_re_verb = protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST; ok_header.in_re_verb = protocol::message_type::VL2_NETWORK_CONFIG_REQUEST;
ok_header.in_re_message_id = in_re_message_id.to_be_bytes(); ok_header.in_re_message_id = in_re_message_id.to_be_bytes();
} else { } else {
packet.append_u8(protocol::verbs::VL2_VERB_NETWORK_CONFIG)?; packet.append_u8(protocol::message_type::VL2_NETWORK_CONFIG)?;
} }
if peer.is_v2() { if peer.is_v2() {
@ -149,6 +201,8 @@ impl Controller {
} }
} }
fn send_revocations(&self, peer: &Peer, revocations: &Vec<Revocation>) {}
/// Called when the DB informs us of a change. /// Called when the DB informs us of a change.
async fn handle_change_notification(self: Arc<Self>, change: Change) { async fn handle_change_notification(self: Arc<Self>, change: Change) {
match change { match change {
@ -267,6 +321,7 @@ impl Controller {
nc.revision = now as u64; nc.revision = now as u64;
nc.mtu = network.mtu.unwrap_or(ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU as u16); nc.mtu = network.mtu.unwrap_or(ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU as u16);
nc.multicast_limit = network.multicast_limit.unwrap_or(DEFAULT_MULTICAST_LIMIT as u32); nc.multicast_limit = network.multicast_limit.unwrap_or(DEFAULT_MULTICAST_LIMIT as u32);
nc.multicast_like_expire = Some(protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE as u32);
nc.routes = network.ip_routes; nc.routes = network.ip_routes;
nc.static_ips = member.ip_assignments.clone(); nc.static_ips = member.ip_assignments.clone();
nc.rules = network.rules; nc.rules = network.rules;
@ -335,6 +390,12 @@ impl Controller {
// TODO: populate node info for V2 networks // TODO: populate node info for V2 networks
} }
let _ = self
.recently_authorized
.write()
.unwrap()
.insert((network_id, source_identity.fingerprint), ms_monotonic() + nc.credential_ttl);
network_config = Some(nc); network_config = Some(nc);
} }
@ -346,9 +407,6 @@ impl Controller {
} }
} }
// Default PathFilter implementations permit anything.
impl PathFilter for Controller {}
impl InnerProtocol for Controller { impl InnerProtocol for Controller {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>( fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self, &self,
@ -363,7 +421,11 @@ impl InnerProtocol for Controller {
mut cursor: usize, mut cursor: usize,
) -> PacketHandlerResult { ) -> PacketHandlerResult {
match verb { match verb {
protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => { protocol::message_type::VL2_NETWORK_CONFIG_REQUEST => {
if !host_system.should_respond_to(&source.identity) {
return PacketHandlerResult::Ok; // handled and ignored
}
let network_id = payload.read_u64(&mut cursor); let network_id = payload.read_u64(&mut cursor);
if network_id.is_err() { if network_id.is_err() {
return PacketHandlerResult::Error; return PacketHandlerResult::Error;
@ -400,19 +462,6 @@ impl InnerProtocol for Controller {
Dictionary::new() Dictionary::new()
}; };
/*
let (have_revision, have_timestamp) = if (cursor + 16) <= payload.len() {
let r = payload.read_u64(&mut cursor);
let t = payload.read_u64(&mut cursor);
if r.is_err() || t.is_err() {
return PacketHandlerResult::Error;
}
(Some(r.unwrap()), Some(t.unwrap()))
} else {
(None, None)
};
*/
// Launch handler as an async background task. // Launch handler as an async background task.
let (self2, peer, source_remote_endpoint) = let (self2, peer, source_remote_endpoint) =
(self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone()); (self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone());
@ -427,6 +476,9 @@ impl InnerProtocol for Controller {
Result::Ok((result, Some(config), revocations)) => { Result::Ok((result, Some(config), revocations)) => {
//println!("{}", serde_yaml::to_string(&config).unwrap()); //println!("{}", serde_yaml::to_string(&config).unwrap());
self2.send_network_config(peer.as_ref(), &config, Some(message_id)); self2.send_network_config(peer.as_ref(), &config, Some(message_id));
if let Some(revocations) = revocations {
self2.send_revocations(peer.as_ref(), &revocations);
}
(result, Some(config)) (result, Some(config))
} }
Result::Ok((result, None, _)) => (result, None), Result::Ok((result, None, _)) => (result, None),
@ -448,6 +500,8 @@ impl InnerProtocol for Controller {
} else { } else {
meta_data.to_bytes() meta_data.to_bytes()
}, },
peer_version: peer.version(),
peer_protocol_version: peer.protocol_version(),
timestamp: now, timestamp: now,
source_remote_endpoint, source_remote_endpoint,
source_hops, source_hops,
@ -461,6 +515,105 @@ impl InnerProtocol for Controller {
PacketHandlerResult::Ok PacketHandlerResult::Ok
} }
protocol::message_type::VL2_MULTICAST_LIKE => {
let mut subscriptions = RMaybeWLockGuard::new_read(&self.multicast_subscriptions);
let auth = self.recently_authorized.read().unwrap();
let time_ticks = ms_monotonic();
while payload.len() >= (8 + 6 + 4) {
let network_id = NetworkId::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap());
if let Some(network_id) = network_id {
let mac = MAC::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap());
if let Some(mac) = mac {
if auth
.get(&(network_id, source.identity.fingerprint))
.map_or(false, |t| *t > time_ticks)
{
let sub_key = (network_id, MulticastGroup { mac, adi: payload.read_u32(&mut cursor).unwrap() });
if let Some(sub) = subscriptions.read().get(&sub_key) {
let _ = sub.lock().unwrap().insert(source.identity.address, time_ticks);
} else {
let _ = subscriptions
.write(&self.multicast_subscriptions)
.entry(sub_key)
.or_insert_with(|| Mutex::new(HashMap::new()))
.lock()
.unwrap()
.insert(source.identity.address, time_ticks);
}
}
}
}
}
PacketHandlerResult::Ok
}
protocol::message_type::VL2_MULTICAST_GATHER => {
if let Some(service) = self.service.read().unwrap().upgrade() {
if let Some(network_id) = payload
.read_bytes_fixed(&mut cursor)
.map_or(None, |network_id| NetworkId::from_bytes_fixed(network_id))
{
let time_ticks = ms_monotonic();
if self
.recently_authorized
.read()
.unwrap()
.get(&(network_id, source.identity.fingerprint))
.map_or(false, |t| *t > time_ticks)
{
cursor += 1; // skip flags, currently unused
if let Some(mac) = payload.read_bytes_fixed(&mut cursor).map_or(None, |mac| MAC::from_bytes_fixed(mac)) {
let mut gathered = Vec::new();
let adi = payload.read_u32(&mut cursor).unwrap_or(0);
let subscriptions = self.multicast_subscriptions.read().unwrap();
if let Some(sub) = subscriptions.get(&(network_id, MulticastGroup { mac, adi })) {
let sub = sub.lock().unwrap();
for a in sub.keys() {
gathered.push(*a);
}
}
while !gathered.is_empty() {
source.send(
service.as_ref(),
service.node(),
None,
time_ticks,
|packet| -> Result<(), OutOfBoundsError> {
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
ok_header.verb = protocol::message_type::VL1_OK;
ok_header.in_re_verb = protocol::message_type::VL2_MULTICAST_GATHER;
ok_header.in_re_message_id = message_id.to_be_bytes();
packet.append_bytes_fixed(&network_id.to_bytes())?;
packet.append_bytes_fixed(&mac.to_bytes())?;
packet.append_u32(adi)?;
packet.append_u32(gathered.len() as u32)?;
let in_this_packet = gathered
.len()
.clamp(1, (packet.capacity() - packet.len()) / protocol::ADDRESS_SIZE)
.min(u16::MAX as usize);
packet.append_u16(in_this_packet as u16)?;
for _ in 0..in_this_packet {
packet.append_bytes_fixed(&gathered.pop().unwrap().to_bytes())?;
}
Ok(())
},
);
}
}
}
}
}
PacketHandlerResult::Ok
}
_ => PacketHandlerResult::NotHandled, _ => PacketHandlerResult::NotHandled,
} }
} }
@ -497,11 +650,17 @@ impl InnerProtocol for Controller {
) -> PacketHandlerResult { ) -> PacketHandlerResult {
PacketHandlerResult::NotHandled PacketHandlerResult::NotHandled
} }
}
fn should_respond_to(&self, _: &Identity) -> bool { impl VL1AuthProvider for Controller {
// Controllers respond to anyone. #[inline(always)]
fn should_respond_to(&self, id: &Identity) -> bool {
true true
} }
fn has_trust_relationship(&self, id: &Identity) -> bool {
false
}
} }
impl Drop for Controller { impl Drop for Controller {

View file

@ -82,26 +82,40 @@ impl ToString for AuthorizationResult {
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RequestLogItem { pub struct RequestLogItem {
#[serde(rename = "nwid")] #[serde(rename = "nw")]
pub network_id: NetworkId, pub network_id: NetworkId,
#[serde(rename = "nid")] #[serde(rename = "n")]
pub node_id: Address, pub node_id: Address,
#[serde(rename = "nf")] #[serde(rename = "nf")]
pub node_fingerprint: Blob<48>, pub node_fingerprint: Blob<48>,
#[serde(rename = "cid")] #[serde(rename = "c")]
pub controller_node_id: Address, pub controller_node_id: Address,
#[serde(skip_serializing_if = "Vec::is_empty")] #[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)] #[serde(default)]
#[serde(rename = "md")] #[serde(rename = "md")]
pub metadata: Vec<u8>, pub metadata: Vec<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[serde(rename = "pv")]
pub peer_version: Option<(u8, u8, u16)>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[serde(rename = "ppv")]
pub peer_protocol_version: Option<u8>,
#[serde(rename = "ts")] #[serde(rename = "ts")]
pub timestamp: i64, pub timestamp: i64,
#[serde(rename = "s")] #[serde(rename = "s")]
pub source_remote_endpoint: Endpoint, pub source_remote_endpoint: Endpoint,
#[serde(rename = "sh")] #[serde(rename = "sh")]
pub source_hops: u8, pub source_hops: u8,
#[serde(rename = "r")] #[serde(rename = "r")]
pub result: AuthorizationResult, pub result: AuthorizationResult,
#[serde(rename = "nc")] #[serde(rename = "nc")]
pub config: Option<NetworkConfig>, pub config: Option<NetworkConfig>,
} }

View file

@ -73,7 +73,8 @@ pub type PacketBufferPool = Pool<PacketBuffer, PacketBufferFactory>;
/// 64-bit message ID (obtained after AEAD decryption). /// 64-bit message ID (obtained after AEAD decryption).
pub type MessageId = u64; pub type MessageId = u64;
pub mod verbs { /// ZeroTier VL1 and VL2 wire protocol message types.
pub mod message_type {
pub const VL1_NOP: u8 = 0x00; pub const VL1_NOP: u8 = 0x00;
pub const VL1_HELLO: u8 = 0x01; pub const VL1_HELLO: u8 = 0x01;
pub const VL1_ERROR: u8 = 0x02; pub const VL1_ERROR: u8 = 0x02;
@ -84,10 +85,10 @@ pub mod verbs {
pub const VL1_PUSH_DIRECT_PATHS: u8 = 0x10; pub const VL1_PUSH_DIRECT_PATHS: u8 = 0x10;
pub const VL1_USER_MESSAGE: u8 = 0x14; pub const VL1_USER_MESSAGE: u8 = 0x14;
pub const VL2_VERB_MULTICAST_LIKE: u8 = 0x09; pub const VL2_MULTICAST_LIKE: u8 = 0x09;
pub const VL2_VERB_NETWORK_CONFIG_REQUEST: u8 = 0x0b; pub const VL2_NETWORK_CONFIG_REQUEST: u8 = 0x0b;
pub const VL2_VERB_NETWORK_CONFIG: u8 = 0x0c; pub const VL2_NETWORK_CONFIG: u8 = 0x0c;
pub const VL2_VERB_MULTICAST_GATHER: u8 = 0x0d; pub const VL2_MULTICAST_GATHER: u8 = 0x0d;
pub fn name(verb: u8) -> &'static str { pub fn name(verb: u8) -> &'static str {
match verb { match verb {
@ -100,10 +101,10 @@ pub mod verbs {
VL1_ECHO => "VL1_ECHO", VL1_ECHO => "VL1_ECHO",
VL1_PUSH_DIRECT_PATHS => "VL1_PUSH_DIRECT_PATHS", VL1_PUSH_DIRECT_PATHS => "VL1_PUSH_DIRECT_PATHS",
VL1_USER_MESSAGE => "VL1_USER_MESSAGE", VL1_USER_MESSAGE => "VL1_USER_MESSAGE",
VL2_VERB_MULTICAST_LIKE => "VL2_VERB_MULTICAST_LIKE", VL2_MULTICAST_LIKE => "VL2_MULTICAST_LIKE",
VL2_VERB_NETWORK_CONFIG_REQUEST => "VL2_VERB_NETWORK_CONFIG_REQUEST", VL2_NETWORK_CONFIG_REQUEST => "VL2_NETWORK_CONFIG_REQUEST",
VL2_VERB_NETWORK_CONFIG => "VL2_VERB_NETWORK_CONFIG", VL2_NETWORK_CONFIG => "VL2_NETWORK_CONFIG",
VL2_VERB_MULTICAST_GATHER => "VL2_VERB_MULTICAST_GATHER", VL2_MULTICAST_GATHER => "VL2_MULTICAST_GATHER",
_ => "???", _ => "???",
} }
} }
@ -588,9 +589,8 @@ pub(crate) const PEER_EXPIRATION_TIME: i64 = (PEER_HELLO_INTERVAL_MAX * 2) + 100
/// Proof of work difficulty (threshold) for identity generation. /// Proof of work difficulty (threshold) for identity generation.
pub(crate) const IDENTITY_POW_THRESHOLD: u8 = 17; pub(crate) const IDENTITY_POW_THRESHOLD: u8 = 17;
/// Maximum number of key/value pairs in a single Tag credential. // Multicast LIKE expire time in milliseconds.
/// (This is for V2 only. In V1 tag credentials can have only one pair.) pub const VL2_DEFAULT_MULTICAST_LIKE_EXPIRE: i64 = 600000;
pub(crate) const MAX_TAG_KEY_VALUE_PAIRS: usize = 128;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View file

@ -19,7 +19,7 @@ pub use event::Event;
pub use identity::Identity; pub use identity::Identity;
pub use inetaddress::InetAddress; pub use inetaddress::InetAddress;
pub use mac::MAC; pub use mac::MAC;
pub use node::{DummyInnerProtocol, DummyPathFilter, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, PathFilter}; pub use node::{DummyInnerProtocol, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, VL1AuthProvider};
pub use path::Path; pub use path::Path;
pub use peer::Peer; pub use peer::Peer;
pub use rootset::{Root, RootSet}; pub use rootset::{Root, RootSet};

View file

@ -27,17 +27,42 @@ use zerotier_utils::marshalable::Marshalable;
use zerotier_utils::ringbuffer::RingBuffer; use zerotier_utils::ringbuffer::RingBuffer;
use zerotier_utils::thing::Thing; use zerotier_utils::thing::Thing;
/// Trait implemented by external code to handle events and provide an interface to the system or application. /// Trait providing VL1 authentication functions to determine which nodes we should talk to.
/// ///
/// These methods are basically callbacks that the core calls to request or transmit things. They are called /// This is included in HostSystem but is provided as a separate trait to make it easy for
/// during calls to things like wire_recieve() and do_background_tasks(). /// implementers of HostSystem to break this out and allow a user to specify it.
pub trait HostSystem: Sync + Send + 'static { pub trait VL1AuthProvider: Sync + Send {
/// Check if this node should respond to messages from a given peer at all.
///
/// If this returns false, the node simply drops messages on the floor and refuses
/// to init V2 sessions.
fn should_respond_to(&self, id: &Identity) -> bool;
/// Check if this node has any trust relationship with the provided identity.
///
/// This should return true if there is any special trust relationship such as mutual
/// membership in a network or for controllers the peer's membership in any network
/// they control.
fn has_trust_relationship(&self, id: &Identity) -> bool;
}
/// Trait to be implemented by outside code to provide object storage to VL1
///
/// This is included in HostSystem but is provided as a separate trait to make it easy for
/// implementers of HostSystem to break this out and allow a user to specify it.
pub trait NodeStorage: Sync + Send {
/// Load this node's identity from the data store.
fn load_node_identity(&self) -> Option<Identity>;
/// Save this node's identity to the data store.
fn save_node_identity(&self, id: &Identity);
}
/// Trait implemented by external code to handle events and provide an interface to the system or application.
pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static {
/// Type for implementation of NodeStorage. /// Type for implementation of NodeStorage.
type Storage: NodeStorage + ?Sized; type Storage: NodeStorage + ?Sized;
/// Path filter implementation for this host.
type PathFilter: PathFilter + ?Sized;
/// Type for local system sockets. /// Type for local system sockets.
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
@ -50,9 +75,6 @@ pub trait HostSystem: Sync + Send + 'static {
/// Get a reference to the local storage implementation at this host. /// Get a reference to the local storage implementation at this host.
fn storage(&self) -> &Self::Storage; fn storage(&self) -> &Self::Storage;
/// Get the path filter implementation for this host.
fn path_filter(&self) -> &Self::PathFilter;
/// Get a pooled packet buffer for internal use. /// Get a pooled packet buffer for internal use.
fn get_buffer(&self) -> PooledPacketBuffer; fn get_buffer(&self) -> PooledPacketBuffer;
@ -84,26 +106,6 @@ pub trait HostSystem: Sync + Send + 'static {
packet_ttl: u8, packet_ttl: u8,
); );
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
/// This needs to be accurate to about 250 milliseconds resolution or better.
fn time_ticks(&self) -> i64;
/// Called to get the current time in milliseconds since epoch from the real-time clock.
/// This needs to be accurate to about one second resolution or better.
fn time_clock(&self) -> i64;
}
/// Trait to be implemented by outside code to provide object storage to VL1
pub trait NodeStorage: Sync + Send {
/// Load this node's identity from the data store.
fn load_node_identity(&self) -> Option<Identity>;
/// Save this node's identity to the data store.
fn save_node_identity(&self, id: &Identity);
}
/// Trait to be implemented to provide path hints and a filter to approve physical paths.
pub trait PathFilter: Sync + Send {
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node. /// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
/// ///
/// The default implementation always returns true. /// The default implementation always returns true.
@ -134,6 +136,14 @@ pub trait PathFilter: Sync + Send {
> { > {
None None
} }
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
/// This needs to be accurate to about 250 milliseconds resolution or better.
fn time_ticks(&self) -> i64;
/// Called to get the current time in milliseconds since epoch from the real-time clock.
/// This needs to be accurate to about one second resolution or better.
fn time_clock(&self) -> i64;
} }
/// Result of a packet handler. /// Result of a packet handler.
@ -199,9 +209,6 @@ pub trait InnerProtocol: Sync + Send {
payload: &PacketBuffer, payload: &PacketBuffer,
cursor: usize, cursor: usize,
) -> PacketHandlerResult; ) -> PacketHandlerResult;
/// Check if this node should respond to messages from a given peer.
fn should_respond_to(&self, id: &Identity) -> bool;
} }
/// How often to check the root cluster definitions against the root list and update. /// How often to check the root cluster definitions against the root list and update.
@ -949,7 +956,7 @@ impl Node {
while !addresses.is_empty() { while !addresses.is_empty() {
if !root if !root
.send(host_system, self, None, time_ticks, |packet| -> Result<(), Infallible> { .send(host_system, self, None, time_ticks, |packet| -> Result<(), Infallible> {
assert!(packet.append_u8(verbs::VL1_WHOIS).is_ok()); assert!(packet.append_u8(message_type::VL1_WHOIS).is_ok());
while !addresses.is_empty() && (packet.len() + ADDRESS_SIZE) <= UDP_DEFAULT_MTU { while !addresses.is_empty() && (packet.len() + ADDRESS_SIZE) <= UDP_DEFAULT_MTU {
assert!(packet.append_bytes_fixed(&addresses[0].to_bytes()).is_ok()); assert!(packet.append_bytes_fixed(&addresses[0].to_bytes()).is_ok());
addresses = &addresses[1..]; addresses = &addresses[1..];
@ -978,7 +985,7 @@ impl Node {
let mut whois_queue = self.whois_queue.lock().unwrap(); let mut whois_queue = self.whois_queue.lock().unwrap();
if let Some(qi) = whois_queue.get_mut(&received_identity.address) { if let Some(qi) = whois_queue.get_mut(&received_identity.address) {
let address = received_identity.address; let address = received_identity.address;
if inner.should_respond_to(&received_identity) { if host_system.should_respond_to(&received_identity) {
let mut peers = self.peers.write().unwrap(); let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get(&address).cloned().or_else(|| { if let Some(peer) = peers.get(&address).cloned().or_else(|| {
Peer::new(&self.identity, received_identity, time_ticks) Peer::new(&self.identity, received_identity, time_ticks)
@ -1154,15 +1161,16 @@ impl InnerProtocol for DummyInnerProtocol {
) -> PacketHandlerResult { ) -> PacketHandlerResult {
PacketHandlerResult::NotHandled PacketHandlerResult::NotHandled
} }
}
impl VL1AuthProvider for DummyInnerProtocol {
#[inline(always)]
fn should_respond_to(&self, id: &Identity) -> bool {
true
}
#[inline(always)] #[inline(always)]
fn should_respond_to(&self, _id: &Identity) -> bool { fn has_trust_relationship(&self, id: &Identity) -> bool {
true true
} }
} }
/// Dummy no-op path filter for debugging and testing.
#[derive(Default)]
pub struct DummyPathFilter;
impl PathFilter for DummyPathFilter {}

View file

@ -424,7 +424,7 @@ impl Peer {
f.0.dest = self.identity.address.to_bytes(); f.0.dest = self.identity.address.to_bytes();
f.0.src = node.identity.address.to_bytes(); f.0.src = node.identity.address.to_bytes();
f.0.flags_cipher_hops = v1::CIPHER_NOCRYPT_POLY1305; f.0.flags_cipher_hops = v1::CIPHER_NOCRYPT_POLY1305;
f.1.verb = verbs::VL1_HELLO | v1::VERB_FLAG_EXTENDED_AUTHENTICATION; f.1.verb = message_type::VL1_HELLO | v1::VERB_FLAG_EXTENDED_AUTHENTICATION;
f.1.version_proto = PROTOCOL_VERSION; f.1.version_proto = PROTOCOL_VERSION;
f.1.version_major = VERSION_MAJOR; f.1.version_major = VERSION_MAJOR;
f.1.version_minor = VERSION_MINOR; f.1.version_minor = VERSION_MINOR;
@ -541,14 +541,16 @@ impl Peer {
host_system, host_system,
"[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})",
u64::from_be_bytes(packet_header.id), u64::from_be_bytes(packet_header.id),
verbs::name(verb), message_type::name(verb),
verb as u32 verb as u32
); );
return match verb { return match verb {
verbs::VL1_NOP => PacketHandlerResult::Ok, message_type::VL1_NOP => PacketHandlerResult::Ok,
verbs::VL1_HELLO => self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload), message_type::VL1_HELLO => {
verbs::VL1_ERROR => self.handle_incoming_error( self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload)
}
message_type::VL1_ERROR => self.handle_incoming_error(
host_system, host_system,
inner, inner,
node, node,
@ -558,7 +560,7 @@ impl Peer {
message_id, message_id,
&payload, &payload,
), ),
verbs::VL1_OK => self.handle_incoming_ok( message_type::VL1_OK => self.handle_incoming_ok(
host_system, host_system,
inner, inner,
node, node,
@ -569,15 +571,17 @@ impl Peer {
path_is_known, path_is_known,
&payload, &payload,
), ),
verbs::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload), message_type::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload),
verbs::VL1_RENDEZVOUS => { message_type::VL1_RENDEZVOUS => {
self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload) self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload)
} }
verbs::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload), message_type::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload),
verbs::VL1_PUSH_DIRECT_PATHS => { message_type::VL1_PUSH_DIRECT_PATHS => {
self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload) self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload)
} }
verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload), message_type::VL1_USER_MESSAGE => {
self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload)
}
_ => inner.handle_packet( _ => inner.handle_packet(
host_system, host_system,
node, node,
@ -606,7 +610,7 @@ impl Peer {
source_path: &Arc<Path>, source_path: &Arc<Path>,
payload: &PacketBuffer, payload: &PacketBuffer,
) -> PacketHandlerResult { ) -> PacketHandlerResult {
if !(inner.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { if !(host_system.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
debug_event!( debug_event!(
host_system, host_system,
"[vl1] dropping HELLO from {} due to lack of trust relationship", "[vl1] dropping HELLO from {} due to lack of trust relationship",
@ -637,8 +641,8 @@ impl Peer {
|packet| -> Result<(), Infallible> { |packet| -> Result<(), Infallible> {
let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) = let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) =
packet.append_struct_get_mut().unwrap(); packet.append_struct_get_mut().unwrap();
f.0.verb = verbs::VL1_OK; f.0.verb = message_type::VL1_OK;
f.0.in_re_verb = verbs::VL1_HELLO; f.0.in_re_verb = message_type::VL1_HELLO;
f.0.in_re_message_id = message_id.to_ne_bytes(); f.0.in_re_message_id = message_id.to_ne_bytes();
f.1.timestamp_echo = hello_fixed_headers.timestamp; f.1.timestamp_echo = hello_fixed_headers.timestamp;
f.1.version_proto = PROTOCOL_VERSION; f.1.version_proto = PROTOCOL_VERSION;
@ -714,7 +718,7 @@ impl Peer {
// TODO: replay attack prevention filter // TODO: replay attack prevention filter
match ok_header.in_re_verb { match ok_header.in_re_verb {
verbs::VL1_HELLO => { message_type::VL1_HELLO => {
if let Ok(_ok_hello_fixed_header_fields) = if let Ok(_ok_hello_fixed_header_fields) =
payload.read_struct::<v1::message_component_structs::OkHelloFixedHeaderFields>(&mut cursor) payload.read_struct::<v1::message_component_structs::OkHelloFixedHeaderFields>(&mut cursor)
{ {
@ -750,7 +754,7 @@ impl Peer {
} }
} }
verbs::VL1_WHOIS => { message_type::VL1_WHOIS => {
debug_event!(host_system, "[vl1] OK(WHOIS)"); debug_event!(host_system, "[vl1] OK(WHOIS)");
if node.is_peer_root(self) { if node.is_peer_root(self) {
while cursor < payload.len() { while cursor < payload.len() {
@ -806,7 +810,7 @@ impl Peer {
message_id: MessageId, message_id: MessageId,
payload: &PacketBuffer, payload: &PacketBuffer,
) -> PacketHandlerResult { ) -> PacketHandlerResult {
if node.this_node_is_root() || inner.should_respond_to(&self.identity) { if node.this_node_is_root() || host_system.should_respond_to(&self.identity) {
let mut addresses = payload.as_bytes(); let mut addresses = payload.as_bytes();
while addresses.len() >= ADDRESS_SIZE { while addresses.len() >= ADDRESS_SIZE {
if !self if !self
@ -852,11 +856,11 @@ impl Peer {
message_id: MessageId, message_id: MessageId,
payload: &PacketBuffer, payload: &PacketBuffer,
) -> PacketHandlerResult { ) -> PacketHandlerResult {
if inner.should_respond_to(&self.identity) || node.is_peer_root(self) { if host_system.should_respond_to(&self.identity) || node.is_peer_root(self) {
self.send(host_system, node, None, time_ticks, |packet| { self.send(host_system, node, None, time_ticks, |packet| {
let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap(); let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap();
f.verb = verbs::VL1_OK; f.verb = message_type::VL1_OK;
f.in_re_verb = verbs::VL1_ECHO; f.in_re_verb = message_type::VL1_ECHO;
f.in_re_message_id = message_id.to_ne_bytes(); f.in_re_message_id = message_id.to_ne_bytes();
packet.append_bytes(payload.as_bytes()) packet.append_bytes(payload.as_bytes())
}); });
@ -935,7 +939,7 @@ fn v1_proto_try_aead_decrypt(
if cipher == v1::CIPHER_SALSA2012_POLY1305 { if cipher == v1::CIPHER_SALSA2012_POLY1305 {
salsa.crypt_in_place(payload.as_bytes_mut()); salsa.crypt_in_place(payload.as_bytes_mut());
Some(message_id) Some(message_id)
} else if (payload.u8_at(0).unwrap_or(0) & v1::VERB_MASK) == verbs::VL1_HELLO { } else if (payload.u8_at(0).unwrap_or(0) & v1::VERB_MASK) == message_type::VL1_HELLO {
Some(message_id) Some(message_id)
} else { } else {
// SECURITY: fail if there is no encryption and the message is not HELLO. No other types are allowed // SECURITY: fail if there is no encryption and the message is not HELLO. No other types are allowed

View file

@ -4,6 +4,7 @@ mod multicastgroup;
mod networkid; mod networkid;
mod switch; mod switch;
pub mod multicastauthority;
pub mod networkconfig; pub mod networkconfig;
pub mod rule; pub mod rule;
pub mod v1; pub mod v1;

View file

@ -0,0 +1,44 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use crate::protocol;
use crate::protocol::PacketBuffer;
use crate::vl1::{Address, HostSystem, Identity, PacketHandlerResult, Peer};
use crate::vl2::{MulticastGroup, NetworkId};
/// Handler implementations for VL2_MULTICAST_LIKE and VL2_MULTICAST_GATHER.
///
/// Both controllers and roots will want to handle these, with the latter supporting them for legacy
/// reasons only. Regular nodes may also want to handle them in the future. So, break this out to allow
/// easy code reuse. To integrate call the appropriate method when the appropriate message type is
/// received and pass in a function to check whether specific network/identity combinations should be
/// processed. The GATHER implementation will send reply packets to the source peer.
pub struct MulticastAuthority {
subscriptions: RwLock<HashMap<(NetworkId, MulticastGroup), Mutex<HashMap<Address, i64>>>>,
}
impl MulticastAuthority {
fn handle_vl2_multicast_like<HostSystemImpl: HostSystem + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
&self,
auth: Authenticator,
host_system: &HostSystemImpl,
source: &Arc<Peer>,
message_id: u64,
payload: &PacketBuffer,
mut cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
fn handle_vl2_multicast_gather<HostSystemImpl: HostSystem + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
&self,
auth: Authenticator,
host_system: &HostSystemImpl,
source: &Arc<Peer>,
message_id: u64,
payload: &PacketBuffer,
mut cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::Ok
}
}

View file

@ -1,8 +1,11 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use crate::vl1::MAC;
use std::cmp::Ordering;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::str::FromStr;
use crate::vl1::MAC;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
#[derive(Clone, Copy, PartialEq, Eq)] #[derive(Clone, Copy, PartialEq, Eq)]
pub struct MulticastGroup { pub struct MulticastGroup {
@ -24,23 +27,6 @@ impl From<MAC> for MulticastGroup {
} }
} }
impl Ord for MulticastGroup {
fn cmp(&self, other: &Self) -> Ordering {
let o = self.mac.cmp(&other.mac);
match o {
Ordering::Equal => self.adi.cmp(&other.adi),
_ => o,
}
}
}
impl PartialOrd for MulticastGroup {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Hash for MulticastGroup { impl Hash for MulticastGroup {
#[inline(always)] #[inline(always)]
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
@ -48,3 +34,74 @@ impl Hash for MulticastGroup {
state.write_u32(self.adi); state.write_u32(self.adi);
} }
} }
impl Serialize for MulticastGroup {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if serializer.is_human_readable() {
serializer.serialize_str(format!("{}/{}", self.mac.to_string(), self.adi.to_string()).as_str())
} else {
let mut tmp = [0u8; 10];
tmp[0..6].copy_from_slice(&self.mac.to_bytes());
tmp[6..10].copy_from_slice(&self.adi.to_be_bytes());
serializer.serialize_bytes(&tmp)
}
}
}
struct MulticastGroupVisitor;
impl<'de> serde::de::Visitor<'de> for MulticastGroupVisitor {
type Value = MulticastGroup;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a ZeroTier network ID")
}
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if v.len() == 10 {
Ok(MulticastGroup {
mac: MAC::from_bytes(&v[..6]).ok_or(E::custom("invalid MAC address"))?,
adi: u32::from_be_bytes((&v[6..10]).try_into().unwrap()),
})
} else {
Err(E::custom("object too large"))
}
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let mut mac = None;
let mut adi: u32 = 0;
let mut x = 0;
for f in v.split('/') {
if x == 0 {
mac = Some(MAC::from_str(f).map_err(|_| E::custom("invalid MAC address"))?);
x = 1;
} else {
adi = u32::from_str(f).unwrap_or(0);
break;
}
}
Ok(MulticastGroup { mac: mac.ok_or(E::custom("invalid MAC address"))?, adi })
}
}
impl<'de> Deserialize<'de> for MulticastGroup {
fn deserialize<D>(deserializer: D) -> Result<MulticastGroup, D::Error>
where
D: Deserializer<'de>,
{
if deserializer.is_human_readable() {
deserializer.deserialize_str(MulticastGroupVisitor)
} else {
deserializer.deserialize_bytes(MulticastGroupVisitor)
}
}
}

View file

@ -53,6 +53,11 @@ pub struct NetworkConfig {
/// Suggested horizon limit for multicast (not a hard limit, but 0 disables multicast) /// Suggested horizon limit for multicast (not a hard limit, but 0 disables multicast)
pub multicast_limit: u32, pub multicast_limit: u32,
/// Multicast "like" expire time in milliseconds (default if omitted).
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub multicast_like_expire: Option<u32>,
/// ZeroTier-assigned L3 routes for this node. /// ZeroTier-assigned L3 routes for this node.
#[serde(skip_serializing_if = "HashSet::is_empty")] #[serde(skip_serializing_if = "HashSet::is_empty")]
#[serde(default)] #[serde(default)]
@ -107,6 +112,7 @@ impl NetworkConfig {
revision: 0, revision: 0,
mtu: 0, mtu: 0,
multicast_limit: 0, multicast_limit: 0,
multicast_like_expire: None,
routes: HashSet::new(), routes: HashSet::new(),
static_ips: HashSet::new(), static_ips: HashSet::new(),
rules: Vec::new(), rules: Vec::new(),

View file

@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::protocol::PacketBuffer; use crate::protocol::PacketBuffer;
use crate::vl1::node::{HostSystem, InnerProtocol, Node, PacketHandlerResult}; use crate::vl1::node::{HostSystem, InnerProtocol, Node, PacketHandlerResult};
use crate::vl1::{Identity, Path, Peer}; use crate::vl1::{Path, Peer};
pub trait SwitchInterface: Sync + Send {} pub trait SwitchInterface: Sync + Send {}
@ -59,10 +59,6 @@ impl InnerProtocol for Switch {
) -> PacketHandlerResult { ) -> PacketHandlerResult {
PacketHandlerResult::NotHandled PacketHandlerResult::NotHandled
} }
fn should_respond_to(&self, id: &Identity) -> bool {
true
}
} }
impl Switch {} impl Switch {}

View file

@ -210,9 +210,13 @@ fn main() {
drop(global_args); // free unnecessary heap before starting service as we're done with CLI args drop(global_args); // free unnecessary heap before starting service as we're done with CLI args
if let Ok(_tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() { if let Ok(_tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() {
let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerProtocol::default()); let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerProtocol::default());
let test_path_filter = Arc::new(zerotier_network_hypervisor::vl1::DummyPathFilter::default());
let datadir = open_datadir(&flags); let datadir = open_datadir(&flags);
let svc = VL1Service::new(datadir, test_inner, test_path_filter, zerotier_vl1_service::VL1Settings::default()); let svc = VL1Service::new(
datadir,
test_inner.clone(),
test_inner,
zerotier_vl1_service::VL1Settings::default(),
);
if svc.is_ok() { if svc.is_ok() {
let svc = svc.unwrap(); let svc = svc.unwrap();
svc.node().init_default_roots(); svc.node().init_default_roots();

View file

@ -178,6 +178,11 @@ impl<T, const C: usize> ArrayVec<T, C> {
self.s self.s
} }
#[inline(always)]
pub fn capacity_remaining(&self) -> usize {
C - self.s
}
#[inline] #[inline]
pub fn pop(&mut self) -> Option<T> { pub fn pop(&mut self) -> Option<T> {
if self.s > 0 { if self.s > 0 {

View file

@ -17,6 +17,7 @@ pub mod memory;
pub mod pool; pub mod pool;
pub mod ringbuffer; pub mod ringbuffer;
pub mod ringbuffermap; pub mod ringbuffermap;
pub mod sync;
pub mod thing; pub mod thing;
pub mod varint; pub mod varint;

39
utils/src/sync.rs Normal file
View file

@ -0,0 +1,39 @@
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
/// Variant version of lock for RwLock with automatic conversion to a write lock as needed.
pub enum RMaybeWLockGuard<'a, T> {
R(Option<RwLockReadGuard<'a, T>>),
W(RwLockWriteGuard<'a, T>),
}
impl<'a, T> RMaybeWLockGuard<'a, T> {
#[inline(always)]
pub fn new_read(l: &'a RwLock<T>) -> Self {
Self::R(Some(l.read().unwrap()))
}
/// Get a readable reference to the object.
#[inline]
pub fn read(&self) -> &T {
match self {
Self::R(r) => &*(r.as_ref().unwrap()),
Self::W(w) => &*w,
}
}
/// Get a writable reference to the object, converting this to a write lock if needed.
#[inline]
pub fn write(&mut self, l: &'a RwLock<T>) -> &mut T {
match self {
Self::R(r) => {
let _ = r.take();
*self = Self::W(l.write().unwrap());
match self {
Self::W(w) => &mut *w,
_ => panic!(),
}
}
Self::W(w) => &mut *w,
}
}
}

View file

@ -28,13 +28,13 @@ const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10;
/// a test harness or just the controller for a controller that runs stand-alone. /// a test harness or just the controller for a controller that runs stand-alone.
pub struct VL1Service< pub struct VL1Service<
NodeStorageImpl: NodeStorage + ?Sized + 'static, NodeStorageImpl: NodeStorage + ?Sized + 'static,
PathFilterImpl: PathFilter + ?Sized + 'static, VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> { > {
state: RwLock<VL1ServiceMutableState>, state: RwLock<VL1ServiceMutableState>,
storage: Arc<NodeStorageImpl>, storage: Arc<NodeStorageImpl>,
vl1_auth_provider: Arc<VL1AuthProviderImpl>,
inner: Arc<InnerProtocolImpl>, inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
buffer_pool: Arc<PacketBufferPool>, buffer_pool: Arc<PacketBufferPool>,
node_container: Option<Node>, // never None, set in new() node_container: Option<Node>, // never None, set in new()
} }
@ -48,14 +48,14 @@ struct VL1ServiceMutableState {
impl< impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static, NodeStorageImpl: NodeStorage + ?Sized + 'static,
PathFilterImpl: PathFilter + ?Sized + 'static, VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl> > VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
{ {
pub fn new( pub fn new(
storage: Arc<NodeStorageImpl>, storage: Arc<NodeStorageImpl>,
vl1_auth_provider: Arc<VL1AuthProviderImpl>,
inner: Arc<InnerProtocolImpl>, inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
settings: VL1Settings, settings: VL1Settings,
) -> Result<Arc<Self>, Box<dyn Error>> { ) -> Result<Arc<Self>, Box<dyn Error>> {
let mut service = Self { let mut service = Self {
@ -66,8 +66,8 @@ impl<
running: true, running: true,
}), }),
storage, storage,
vl1_auth_provider,
inner, inner,
path_filter,
buffer_pool: Arc::new(PacketBufferPool::new( buffer_pool: Arc::new(PacketBufferPool::new(
std::thread::available_parallelism().map_or(2, |c| c.get() + 2), std::thread::available_parallelism().map_or(2, |c| c.get() + 2),
PacketBufferFactory::new(), PacketBufferFactory::new(),
@ -191,9 +191,9 @@ impl<
impl< impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static, NodeStorageImpl: NodeStorage + ?Sized + 'static,
PathFilterImpl: PathFilter + ?Sized + 'static, VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> UdpPacketHandler for VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl> > UdpPacketHandler for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
{ {
#[inline(always)] #[inline(always)]
fn incoming_udp_packet( fn incoming_udp_packet(
@ -217,12 +217,11 @@ impl<
impl< impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static, NodeStorageImpl: NodeStorage + ?Sized + 'static,
PathFilterImpl: PathFilter + ?Sized + 'static, VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> HostSystem for VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl> > HostSystem for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
{ {
type Storage = NodeStorageImpl; type Storage = NodeStorageImpl;
type PathFilter = PathFilterImpl;
type LocalSocket = crate::LocalSocket; type LocalSocket = crate::LocalSocket;
type LocalInterface = crate::LocalInterface; type LocalInterface = crate::LocalInterface;
@ -243,11 +242,6 @@ impl<
self.storage.as_ref() self.storage.as_ref()
} }
#[inline(always)]
fn path_filter(&self) -> &Self::PathFilter {
self.path_filter.as_ref()
}
#[inline] #[inline]
fn get_buffer(&self) -> zerotier_network_hypervisor::protocol::PooledPacketBuffer { fn get_buffer(&self) -> zerotier_network_hypervisor::protocol::PooledPacketBuffer {
self.buffer_pool.get() self.buffer_pool.get()
@ -329,9 +323,43 @@ impl<
impl< impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static, NodeStorageImpl: NodeStorage + ?Sized + 'static,
PathFilterImpl: PathFilter + ?Sized + 'static, VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> Drop for VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl> > NodeStorage for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
{
#[inline(always)]
fn load_node_identity(&self) -> Option<Identity> {
self.storage.load_node_identity()
}
#[inline(always)]
fn save_node_identity(&self, id: &Identity) {
self.storage.save_node_identity(id)
}
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> VL1AuthProvider for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
{
#[inline(always)]
fn should_respond_to(&self, id: &Identity) -> bool {
self.vl1_auth_provider.should_respond_to(id)
}
#[inline(always)]
fn has_trust_relationship(&self, id: &Identity) -> bool {
self.vl1_auth_provider.has_trust_relationship(id)
}
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> Drop for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
{ {
fn drop(&mut self) { fn drop(&mut self) {
let mut state = self.state.write().unwrap(); let mut state = self.state.write().unwrap();