Work in progress: massive reorg to separate concerns a bit more and allow stand-alone controllers etc.

This commit is contained in:
Adam Ierymenko 2022-09-15 13:57:21 -04:00
parent 854ca07e87
commit b4edad6bfb
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
38 changed files with 1321 additions and 1473 deletions

View file

@ -1,11 +1,12 @@
[workspace]
members = [
"crypto",
"network-hypervisor",
"controller",
"system-service",
"utils",
"crypto",
"network-hypervisor",
"controller",
"service",
"vl1-service",
"utils",
]
[profile.release]

View file

@ -9,9 +9,4 @@ pub mod util;
pub mod vl1;
pub mod vl2;
mod event;
mod networkhypervisor;
pub use event::Event;
pub use networkhypervisor::{Interface, NetworkHypervisor};
pub use vl1::protocol::{PacketBuffer, PooledPacketBuffer};

View file

@ -1,101 +0,0 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::time::Duration;
use async_trait::async_trait;
use crate::error::InvalidParameterError;
use crate::util::buffer::Buffer;
use crate::util::marshalable::Marshalable;
use crate::vl1::node::*;
use crate::vl1::protocol::PooledPacketBuffer;
use crate::vl1::*;
use crate::vl2::switch::*;
#[async_trait]
pub trait Interface: SystemInterface + SwitchInterface {}
pub struct NetworkHypervisor<I: Interface> {
vl1: Node<I>,
vl2: Switch,
}
impl<I: Interface> NetworkHypervisor<I> {
pub async fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
Ok(NetworkHypervisor {
vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity).await?,
vl2: Switch::new().await,
})
}
#[inline(always)]
pub fn get_packet_buffer(&self) -> PooledPacketBuffer {
self.vl1.get_packet_buffer()
}
#[inline(always)]
pub fn address(&self) -> Address {
self.vl1.identity.address
}
#[inline(always)]
pub fn identity(&self) -> &Identity {
&self.vl1.identity
}
/// Run background tasks and return desired delay until next call in milliseconds.
///
/// This shouldn't be called concurrently by more than one loop. Doing so would be harmless
/// but would be a waste of compute cycles.
#[inline(always)]
pub async fn do_background_tasks(&self, ii: &I) -> Duration {
self.vl1.do_background_tasks(ii).await
}
/// Process a physical packet received over a network interface.
#[inline(always)]
pub async fn handle_incoming_physical_packet(
&self,
ii: &I,
source_endpoint: &Endpoint,
source_local_socket: &I::LocalSocket,
source_local_interface: &I::LocalInterface,
data: PooledPacketBuffer,
) {
self.vl1
.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data)
.await
}
/// Add or update a root set.
///
/// If no root set exists by this name, a new root set is added. If one already
/// exists it's checked against the new one and updated if the new set is valid
/// and should supersede it.
///
/// Changes will take effect within a few seconds when root sets are next
/// examined and synchronized with peer and root list state.
///
/// This returns true if the new root set was accepted and false otherwise.
#[inline(always)]
pub fn add_update_root_set(&self, rs: RootSet) -> bool {
self.vl1.add_update_root_set(rs)
}
/// Add or update the compiled-in default ZeroTier RootSet.
///
/// This is equivalent to unmarshaling default-rootset/root.zerotier.com.bin and then
/// calling add_update_root_set().
pub fn add_update_default_root_set(&self) -> bool {
let mut buf: Buffer<4096> = Buffer::new();
//buf.set_to(include_bytes!("../default-rootset/root.zerotier.com.bin"));
buf.set_to(include_bytes!("../default-rootset/test-root.bin"));
let mut cursor = 0;
self.add_update_root_set(RootSet::unmarshal(&buf, &mut cursor).unwrap())
}
/// Call add_update_default_root_set if there are no roots defined, otherwise do nothing and return false.
pub fn add_update_default_root_set_if_none(&self) {
assert!(self.add_update_default_root_set());
}
}

View file

@ -15,7 +15,7 @@ pub mod testutil;
#[allow(unused_macros)]
macro_rules! debug_event {
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {
$si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?)));
$si.event(crate::vl1::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?)));
}
}

View file

@ -2,8 +2,8 @@
mod address;
mod endpoint;
mod event;
mod fragmentedpacket;
mod identity;
mod mac;
mod path;
mod peer;
@ -15,14 +15,16 @@ pub(crate) mod node;
#[allow(unused)]
pub(crate) mod protocol;
pub mod identity;
pub mod inetaddress;
pub use address::Address;
pub use endpoint::Endpoint;
pub use identity::*;
pub use event::Event;
pub use identity::Identity;
pub use inetaddress::InetAddress;
pub use mac::MAC;
pub use node::SystemInterface;
pub use node::{HostSystem, InnerProtocol, Node, PathFilter, Storage};
pub use path::Path;
pub use peer::Peer;
pub use rootset::{Root, RootSet};

View file

@ -14,12 +14,15 @@ use crate::error::InvalidParameterError;
use crate::util::debug_event;
use crate::util::gate::IntervalGate;
use crate::util::marshalable::Marshalable;
use crate::vl1::address::Address;
use crate::vl1::endpoint::Endpoint;
use crate::vl1::event::Event;
use crate::vl1::identity::Identity;
use crate::vl1::path::{Path, PathServiceResult};
use crate::vl1::peer::Peer;
use crate::vl1::protocol::*;
use crate::vl1::rootset::RootSet;
use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue};
use crate::vl1::{Address, Endpoint, Identity, RootSet};
use crate::Event;
use zerotier_crypto::random;
use zerotier_utils::hex;
@ -29,7 +32,7 @@ use zerotier_utils::hex;
/// These methods are basically callbacks that the core calls to request or transmit things. They are called
/// during calls to things like wire_recieve() and do_background_tasks().
#[async_trait]
pub trait SystemInterface: Sync + Send + 'static {
pub trait HostSystem: Sync + Send + 'static {
/// Type for local system sockets.
type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
@ -51,12 +54,6 @@ pub trait SystemInterface: Sync + Send + 'static {
/// unbound, etc.
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool;
/// Load this node's identity from the data store.
async fn load_node_identity(&self) -> Option<Identity>;
/// Save this node's identity to the data store.
async fn save_node_identity(&self, id: &Identity);
/// Called to send a packet over the physical network (virtual -> physical).
///
/// This may return false if the send definitely failed. Otherwise it should return true
@ -75,22 +72,10 @@ pub trait SystemInterface: Sync + Send + 'static {
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
data: &[&[u8]],
data: &[u8],
packet_ttl: u8,
) -> bool;
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
async fn check_path(
&self,
id: &Identity,
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
) -> bool;
/// Called to look up any statically defined or memorized paths to known nodes.
async fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>>;
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
/// This needs to be accurate to about 250 milliseconds resolution or better.
fn time_ticks(&self) -> i64;
@ -100,23 +85,63 @@ pub trait SystemInterface: Sync + Send + 'static {
fn time_clock(&self) -> i64;
}
/// Trait to be implemented by outside code to provide object storage to VL1
#[async_trait]
pub trait Storage: Sync + Send + 'static {
/// Load this node's identity from the data store.
async fn load_node_identity(&self) -> Option<Identity>;
/// Save this node's identity to the data store.
async fn save_node_identity(&self, id: &Identity);
}
/// Trait to be implemented to provide path hints and a filter to approve physical paths
#[async_trait]
pub trait PathFilter<HostSystemImpl: HostSystem>: Sync + Send + 'static {
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
async fn check_path(
&self,
id: &Identity,
endpoint: &Endpoint,
local_socket: Option<&HostSystemImpl::LocalSocket>,
local_interface: Option<&HostSystemImpl::LocalInterface>,
) -> bool;
/// Called to look up any statically defined or memorized paths to known nodes.
async fn get_path_hints(
&self,
id: &Identity,
) -> Option<
Vec<(
Endpoint,
Option<HostSystemImpl::LocalSocket>,
Option<HostSystemImpl::LocalInterface>,
)>,
>;
}
/// Interface between VL1 and higher/inner protocol layers.
///
/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
#[async_trait]
pub trait InnerProtocolInterface: Sync + Send + 'static {
pub trait InnerProtocol: Sync + Send + 'static {
/// Handle a packet, returning true if it was handled by the next layer.
///
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
async fn handle_packet<SI: SystemInterface>(&self, source: &Peer<SI>, source_path: &Path<SI>, verb: u8, payload: &PacketBuffer)
-> bool;
async fn handle_packet<HostSystemImpl: HostSystem>(
&self,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
verb: u8,
payload: &PacketBuffer,
) -> bool;
/// Handle errors, returning true if the error was recognized.
async fn handle_error<SI: SystemInterface>(
async fn handle_error<HostSystemImpl: HostSystem>(
&self,
source: &Peer<SI>,
source_path: &Path<SI>,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
in_re_verb: u8,
in_re_message_id: u64,
error_code: u8,
@ -125,10 +150,10 @@ pub trait InnerProtocolInterface: Sync + Send + 'static {
) -> bool;
/// Handle an OK, returing true if the OK was recognized.
async fn handle_ok<SI: SystemInterface>(
async fn handle_ok<HostSystemImpl: HostSystem>(
&self,
source: &Peer<SI>,
source_path: &Path<SI>,
source: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
in_re_verb: u8,
in_re_message_id: u64,
payload: &PacketBuffer,
@ -152,20 +177,20 @@ struct BackgroundTaskIntervals {
whois_service: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>,
}
struct RootInfo<SI: SystemInterface> {
struct RootInfo<HostSystemImpl: HostSystem> {
sets: HashMap<String, RootSet>,
roots: HashMap<Arc<Peer<SI>>, Vec<Endpoint>>,
roots: HashMap<Arc<Peer<HostSystemImpl>>, Vec<Endpoint>>,
this_root_sets: Option<Vec<u8>>,
sets_modified: bool,
online: bool,
}
enum PathKey<'a, SI: SystemInterface> {
Copied(Endpoint, SI::LocalSocket),
Ref(&'a Endpoint, &'a SI::LocalSocket),
enum PathKey<'a, HostSystemImpl: HostSystem> {
Copied(Endpoint, HostSystemImpl::LocalSocket),
Ref(&'a Endpoint, &'a HostSystemImpl::LocalSocket),
}
impl<'a, SI: SystemInterface> Hash for PathKey<'a, SI> {
impl<'a, HostSystemImpl: HostSystem> Hash for PathKey<'a, HostSystemImpl> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Self::Copied(ep, ls) => {
@ -180,7 +205,7 @@ impl<'a, SI: SystemInterface> Hash for PathKey<'a, SI> {
}
}
impl<'a, SI: SystemInterface> PartialEq for PathKey<'_, SI> {
impl<'a, HostSystemImpl: HostSystem> PartialEq for PathKey<'_, HostSystemImpl> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2),
@ -191,11 +216,11 @@ impl<'a, SI: SystemInterface> PartialEq for PathKey<'_, SI> {
}
}
impl<'a, SI: SystemInterface> Eq for PathKey<'_, SI> {}
impl<'a, HostSystemImpl: HostSystem> Eq for PathKey<'_, HostSystemImpl> {}
impl<'a, SI: SystemInterface> PathKey<'a, SI> {
impl<'a, HostSystemImpl: HostSystem> PathKey<'a, HostSystemImpl> {
#[inline(always)]
fn local_socket(&self) -> &SI::LocalSocket {
fn local_socket(&self) -> &HostSystemImpl::LocalSocket {
match self {
Self::Copied(_, ls) => ls,
Self::Ref(_, ls) => *ls,
@ -203,16 +228,16 @@ impl<'a, SI: SystemInterface> PathKey<'a, SI> {
}
#[inline(always)]
fn to_copied(&self) -> PathKey<'static, SI> {
fn to_copied(&self) -> PathKey<'static, HostSystemImpl> {
match self {
Self::Copied(ep, ls) => PathKey::<'static, SI>::Copied(ep.clone(), ls.clone()),
Self::Ref(ep, ls) => PathKey::<'static, SI>::Copied((*ep).clone(), (*ls).clone()),
Self::Copied(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()),
Self::Ref(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()),
}
}
}
/// A VL1 global P2P network node.
pub struct Node<SI: SystemInterface> {
pub struct Node<HostSystemImpl: HostSystem> {
/// A random ID generated to identify this particular running instance.
pub instance_id: [u8; 16],
@ -223,16 +248,16 @@ pub struct Node<SI: SystemInterface> {
intervals: Mutex<BackgroundTaskIntervals>,
/// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use.
paths: parking_lot::RwLock<HashMap<PathKey<'static, SI>, Arc<Path<SI>>>>,
paths: parking_lot::RwLock<HashMap<PathKey<'static, HostSystemImpl>, Arc<Path<HostSystemImpl>>>>,
/// Peers with which we are currently communicating.
peers: parking_lot::RwLock<HashMap<Address, Arc<Peer<SI>>>>,
peers: parking_lot::RwLock<HashMap<Address, Arc<Peer<HostSystemImpl>>>>,
/// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions.
roots: RwLock<RootInfo<SI>>,
roots: RwLock<RootInfo<HostSystemImpl>>,
/// Current best root.
best_root: RwLock<Option<Arc<Peer<SI>>>>,
best_root: RwLock<Option<Arc<Peer<HostSystemImpl>>>>,
/// Identity lookup queue, also holds packets waiting on a lookup.
whois: WhoisQueue,
@ -241,17 +266,22 @@ pub struct Node<SI: SystemInterface> {
buffer_pool: PacketBufferPool,
}
impl<SI: SystemInterface> Node<SI> {
pub async fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
pub async fn new<StorageImpl: Storage>(
host_system: &HostSystemImpl,
storage: &StorageImpl,
auto_generate_identity: bool,
auto_upgrade_identity: bool,
) -> Result<Self, InvalidParameterError> {
let mut id = {
let id = si.load_node_identity().await;
let id = storage.load_node_identity().await;
if id.is_none() {
if !auto_generate_identity {
return Err(InvalidParameterError("no identity found and auto-generate not enabled"));
} else {
let id = Identity::generate();
si.event(Event::IdentityAutoGenerated(id.clone()));
si.save_node_identity(&id).await;
host_system.event(Event::IdentityAutoGenerated(id.clone()));
storage.save_node_identity(&id).await;
id
}
} else {
@ -262,12 +292,12 @@ impl<SI: SystemInterface> Node<SI> {
if auto_upgrade_identity {
let old = id.clone();
if id.upgrade()? {
si.save_node_identity(&id).await;
si.event(Event::IdentityAutoUpgraded(old, id.clone()));
storage.save_node_identity(&id).await;
host_system.event(Event::IdentityAutoUpgraded(old, id.clone()));
}
}
debug_event!(si, "[vl1] loaded identity {}", id.to_string());
debug_event!(host_system, "[vl1] loaded identity {}", id.to_string());
Ok(Self {
instance_id: random::get_bytes_secure(),
@ -293,7 +323,7 @@ impl<SI: SystemInterface> Node<SI> {
self.buffer_pool.get()
}
pub fn peer(&self, a: Address) -> Option<Arc<Peer<SI>>> {
pub fn peer(&self, a: Address) -> Option<Arc<Peer<HostSystemImpl>>> {
self.peers.read().get(&a).cloned()
}
@ -301,7 +331,7 @@ impl<SI: SystemInterface> Node<SI> {
self.roots.read().online
}
fn update_best_root(&self, si: &SI, time_ticks: i64) {
fn update_best_root(&self, host_system: &HostSystemImpl, time_ticks: i64) {
let roots = self.roots.read();
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
@ -321,7 +351,7 @@ impl<SI: SystemInterface> Node<SI> {
if let Some(best_root) = best_root.as_mut() {
if !Arc::ptr_eq(best_root, best) {
debug_event!(
si,
host_system,
"[vl1] new best root: {} (replaced {})",
best.identity.address.to_string(),
best_root.identity.address.to_string()
@ -329,12 +359,20 @@ impl<SI: SystemInterface> Node<SI> {
*best_root = best.clone();
}
} else {
debug_event!(si, "[vl1] new best root: {} (was empty)", best.identity.address.to_string());
debug_event!(
host_system,
"[vl1] new best root: {} (was empty)",
best.identity.address.to_string()
);
let _ = best_root.insert(best.clone());
}
} else {
if let Some(old_best) = self.best_root.write().take() {
debug_event!(si, "[vl1] new best root: NONE (replaced {})", old_best.identity.address.to_string());
debug_event!(
host_system,
"[vl1] new best root: NONE (replaced {})",
old_best.identity.address.to_string()
);
}
}
@ -343,17 +381,17 @@ impl<SI: SystemInterface> Node<SI> {
if !roots.online {
drop(roots);
self.roots.write().online = true;
si.event(Event::Online(true));
host_system.event(Event::Online(true));
}
} else if roots.online {
drop(roots);
self.roots.write().online = false;
si.event(Event::Online(false));
host_system.event(Event::Online(false));
}
}
pub async fn do_background_tasks(&self, si: &SI) -> Duration {
let tt = si.time_ticks();
pub async fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration {
let tt = host_system.time_ticks();
let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_service) = {
let mut intervals = self.intervals.lock();
(
@ -372,7 +410,7 @@ impl<SI: SystemInterface> Node<SI> {
}
debug_event!(
si,
host_system,
"[vl1] do_background_tasks:{}{}{}{}{}{} ----",
if root_sync {
" root_sync"
@ -416,7 +454,7 @@ impl<SI: SystemInterface> Node<SI> {
false
}
} {
debug_event!(si, "[vl1] root sets modified, synchronizing internal data structures");
debug_event!(host_system, "[vl1] root sets modified, synchronizing internal data structures");
let (mut old_root_identities, address_collisions, new_roots, bad_identities, my_root_sets) = {
let roots = self.roots.read();
@ -456,7 +494,7 @@ impl<SI: SystemInterface> Node<SI> {
if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) && !m.identity.eq(&self.identity)
{
debug_event!(
si,
host_system,
"[vl1] examining root {} with {} endpoints",
m.identity.address.to_string(),
m.endpoints.as_ref().map_or(0, |e| e.len())
@ -465,7 +503,7 @@ impl<SI: SystemInterface> Node<SI> {
if let Some(peer) = peers.get(&m.identity.address) {
new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
} else {
if let Some(peer) = Peer::<SI>::new(&self.identity, m.identity.clone(), tt) {
if let Some(peer) = Peer::<HostSystemImpl>::new(&self.identity, m.identity.clone(), tt) {
new_roots.insert(
parking_lot::RwLockUpgradableReadGuard::upgrade(peers)
.entry(m.identity.address)
@ -485,13 +523,13 @@ impl<SI: SystemInterface> Node<SI> {
};
for c in address_collisions.iter() {
si.event(Event::SecurityWarning(format!(
host_system.event(Event::SecurityWarning(format!(
"address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!",
c.to_string()
)));
}
for i in bad_identities.iter() {
si.event(Event::SecurityWarning(format!(
host_system.event(Event::SecurityWarning(format!(
"bad identity detected for address {} in at least one root set, ignoring (error creating peer object)",
i.address.to_string()
)));
@ -505,11 +543,11 @@ impl<SI: SystemInterface> Node<SI> {
let mut roots = self.roots.write();
roots.roots = new_roots;
roots.this_root_sets = my_root_sets;
si.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
host_system.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
}
}
self.update_best_root(si, tt);
self.update_best_root(host_system, tt);
}
// Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint
@ -528,12 +566,12 @@ impl<SI: SystemInterface> Node<SI> {
for (root, endpoints) in roots.iter() {
for ep in endpoints.iter() {
debug_event!(
si,
host_system,
"sending HELLO to root {} (root interval: {})",
root.identity.address.to_string(),
ROOT_HELLO_INTERVAL
);
root.send_hello(si, self, Some(ep)).await;
root.send_hello(host_system, self, Some(ep)).await;
}
}
}
@ -545,7 +583,7 @@ impl<SI: SystemInterface> Node<SI> {
{
let roots = self.roots.read();
for (a, peer) in self.peers.read().iter() {
if !peer.service(si, self, tt) && !roots.roots.contains_key(peer) {
if !peer.service(host_system, self, tt) && !roots.roots.contains_key(peer) {
dead_peers.push(*a);
}
}
@ -561,7 +599,7 @@ impl<SI: SystemInterface> Node<SI> {
let mut dead_paths = Vec::new();
let mut need_keepalive = Vec::new();
for (k, path) in self.paths.read().iter() {
if si.local_socket_is_valid(k.local_socket()) {
if host_system.local_socket_is_valid(k.local_socket()) {
match path.service(tt) {
PathServiceResult::Ok => {}
PathServiceResult::Dead => dead_paths.push(k.to_copied()),
@ -575,32 +613,32 @@ impl<SI: SystemInterface> Node<SI> {
self.paths.write().remove(dp);
}
let ka = [tt as u8]; // send different bytes every time for keepalive in case some things filter zero packets
let ka2 = [&ka[..1]];
for ka in need_keepalive.iter() {
si.wire_send(&ka.endpoint, Some(&ka.local_socket), Some(&ka.local_interface), &ka2, 0)
for p in need_keepalive.iter() {
host_system
.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &ka[..1], 0)
.await;
}
}
if whois_service {
self.whois.service(si, self, tt);
self.whois.service(host_system, self, tt);
}
debug_event!(si, "[vl1] do_background_tasks DONE ----");
debug_event!(host_system, "[vl1] do_background_tasks DONE ----");
Duration::from_millis(1000)
}
pub async fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(
pub async fn handle_incoming_physical_packet<InnerProtocolImpl: InnerProtocol>(
&self,
si: &SI,
ph: &PH,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
source_endpoint: &Endpoint,
source_local_socket: &SI::LocalSocket,
source_local_interface: &SI::LocalInterface,
source_local_socket: &HostSystemImpl::LocalSocket,
source_local_interface: &HostSystemImpl::LocalInterface,
mut data: PooledPacketBuffer,
) {
debug_event!(
si,
host_system,
"[vl1] {} -> #{} {}->{} length {} (on socket {}@{})",
source_endpoint.to_string(),
data.bytes_fixed_at::<8>(0)
@ -614,7 +652,7 @@ impl<SI: SystemInterface> Node<SI> {
if let Ok(fragment_header) = data.struct_mut_at::<v1::FragmentHeader>(0) {
if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) {
let time_ticks = si.time_ticks();
let time_ticks = host_system.time_ticks();
if dest == self.identity.address {
let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks);
path.log_receive_anything(time_ticks);
@ -623,7 +661,7 @@ impl<SI: SystemInterface> Node<SI> {
#[cfg(debug_assertions)]
let fragment_header_id = u64::from_be_bytes(fragment_header.id);
debug_event!(
si,
host_system,
"[vl1] #{:0>16x} fragment {} of {} received",
u64::from_be_bytes(fragment_header.id),
fragment_header.fragment_no(),
@ -639,15 +677,15 @@ impl<SI: SystemInterface> Node<SI> {
) {
if let Some(frag0) = assembled_packet.frags[0].as_ref() {
#[cfg(debug_assertions)]
debug_event!(si, "[vl1] #{:0>16x} packet fully assembled!", fragment_header_id);
debug_event!(host_system, "[vl1] #{:0>16x} packet fully assembled!", fragment_header_id);
if let Ok(packet_header) = frag0.struct_at::<v1::PacketHeader>(0) {
if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) {
peer.receive(
self,
si,
ph,
host_system,
inner,
time_ticks,
&path,
packet_header,
@ -656,7 +694,8 @@ impl<SI: SystemInterface> Node<SI> {
)
.await;
} else {
self.whois.query(self, si, source, Some(QueuedPacket::Fragmented(assembled_packet)));
self.whois
.query(self, host_system, source, Some(QueuedPacket::Fragmented(assembled_packet)));
}
}
}
@ -665,14 +704,14 @@ impl<SI: SystemInterface> Node<SI> {
} else {
#[cfg(debug_assertions)]
if let Ok(packet_header) = data.struct_at::<v1::PacketHeader>(0) {
debug_event!(si, "[vl1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id));
debug_event!(host_system, "[vl1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id));
if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) {
peer.receive(self, si, ph, time_ticks, &path, packet_header, data.as_ref(), &[])
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[])
.await;
} else {
self.whois.query(self, si, source, Some(QueuedPacket::Unfragmented(data)));
self.whois.query(self, host_system, source, Some(QueuedPacket::Unfragmented(data)));
}
}
}
@ -686,7 +725,7 @@ impl<SI: SystemInterface> Node<SI> {
{
debug_packet_id = u64::from_be_bytes(fragment_header.id);
debug_event!(
si,
host_system,
"[vl1] #{:0>16x} forwarding packet fragment to {}",
debug_packet_id,
dest.to_string()
@ -694,7 +733,7 @@ impl<SI: SystemInterface> Node<SI> {
}
if fragment_header.increment_hops() > v1::FORWARD_MAX_HOPS {
#[cfg(debug_assertions)]
debug_event!(si, "[vl1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
debug_event!(host_system, "[vl1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
return;
}
} else {
@ -702,12 +741,17 @@ impl<SI: SystemInterface> Node<SI> {
#[cfg(debug_assertions)]
{
debug_packet_id = u64::from_be_bytes(packet_header.id);
debug_event!(si, "[vl1] #{:0>16x} forwarding packet to {}", debug_packet_id, dest.to_string());
debug_event!(
host_system,
"[vl1] #{:0>16x} forwarding packet to {}",
debug_packet_id,
dest.to_string()
);
}
if packet_header.increment_hops() > v1::FORWARD_MAX_HOPS {
#[cfg(debug_assertions)]
debug_event!(
si,
host_system,
"[vl1] #{:0>16x} discarded: max hops exceeded!",
u64::from_be_bytes(packet_header.id)
);
@ -720,9 +764,9 @@ impl<SI: SystemInterface> Node<SI> {
if let Some(peer) = self.peer(dest) {
// TODO: SHOULD we forward? Need a way to check.
peer.forward(si, time_ticks, data.as_ref()).await;
peer.forward(host_system, time_ticks, data.as_ref()).await;
#[cfg(debug_assertions)]
debug_event!(si, "[vl1] #{:0>16x} forwarded successfully", debug_packet_id);
debug_event!(host_system, "[vl1] #{:0>16x} forwarded successfully", debug_packet_id);
}
}
}
@ -730,16 +774,21 @@ impl<SI: SystemInterface> Node<SI> {
}
/// Get the current "best" root from among this node's trusted roots.
pub fn best_root(&self) -> Option<Arc<Peer<SI>>> {
pub fn best_root(&self) -> Option<Arc<Peer<HostSystemImpl>>> {
self.best_root.read().clone()
}
/// Check whether this peer is a root according to any root set trusted by this node.
pub fn is_peer_root(&self, peer: &Peer<SI>) -> bool {
pub fn is_peer_root(&self, peer: &Peer<HostSystemImpl>) -> bool {
self.roots.read().roots.keys().any(|p| p.identity.eq(&peer.identity))
}
/// Called when a remote node sends us a root set update.
/// Called when a remote node sends us a root set update, applying the update if it is valid and applicable.
///
/// This will only replace an existing root set with a newer one. It won't add a new root set, which must be
/// done by an authorized user or administrator not just by a root.
///
/// SECURITY NOTE: this DOES NOT validate certificates in the supplied root set! Caller must do that first!
pub(crate) fn remote_update_root_set(&self, received_from: &Identity, rs: RootSet) {
let mut roots = self.roots.write();
if let Some(entry) = roots.sets.get_mut(&rs.name) {
@ -776,11 +825,6 @@ impl<SI: SystemInterface> Node<SI> {
self.roots.read().sets.values().cloned().collect()
}
/// Get the root set(s) to which this node belongs if it is a root.
pub(crate) fn this_root_sets_as_bytes(&self) -> Option<Vec<u8>> {
self.roots.read().this_root_sets.clone()
}
/// Returns true if this node is a member of a root set (that it knows about).
pub fn this_node_is_root(&self) -> bool {
self.roots.read().this_root_sets.is_some()
@ -790,10 +834,10 @@ impl<SI: SystemInterface> Node<SI> {
pub(crate) fn canonical_path(
&self,
ep: &Endpoint,
local_socket: &SI::LocalSocket,
local_interface: &SI::LocalInterface,
local_socket: &HostSystemImpl::LocalSocket,
local_interface: &HostSystemImpl::LocalInterface,
time_ticks: i64,
) -> Arc<Path<SI>> {
) -> Arc<Path<HostSystemImpl>> {
if let Some(path) = self.paths.read().get(&PathKey::Ref(ep, local_socket)) {
path.clone()
} else {

View file

@ -25,18 +25,23 @@ pub(crate) enum PathServiceResult {
/// These are maintained in Node and canonicalized so that all unique paths have
/// one and only one unique path object. That enables statistics to be tracked
/// for them and uniform application of things like keepalives.
pub struct Path<SI: SystemInterface> {
pub struct Path<HostSystemImpl: HostSystem> {
pub endpoint: Endpoint,
pub local_socket: SI::LocalSocket,
pub local_interface: SI::LocalInterface,
pub local_socket: HostSystemImpl::LocalSocket,
pub local_interface: HostSystemImpl::LocalInterface,
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<PacketId, FragmentedPacket, PacketIdHasher>>,
}
impl<SI: SystemInterface> Path<SI> {
pub fn new(endpoint: Endpoint, local_socket: SI::LocalSocket, local_interface: SI::LocalInterface, time_ticks: i64) -> Self {
impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
pub fn new(
endpoint: Endpoint,
local_socket: HostSystemImpl::LocalSocket,
local_interface: HostSystemImpl::LocalInterface,
time_ticks: i64,
) -> Self {
Self {
endpoint,
local_socket,

File diff suppressed because it is too large Load diff

View file

@ -6,7 +6,7 @@ use parking_lot::Mutex;
use crate::util::gate::IntervalGate;
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::{Node, SystemInterface};
use crate::vl1::node::{HostSystem, Node};
use crate::vl1::protocol::{PooledPacketBuffer, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_INTERVAL, WHOIS_RETRY_MAX};
use crate::vl1::Address;
@ -31,7 +31,7 @@ impl WhoisQueue {
}
/// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received.
pub fn query<SI: SystemInterface>(&self, node: &Node<SI>, si: &SI, target: Address, packet: Option<QueuedPacket>) {
pub fn query<SI: HostSystem>(&self, node: &Node<SI>, si: &SI, target: Address, packet: Option<QueuedPacket>) {
let mut q = self.0.lock();
let qi = q.entry(target).or_insert_with(|| WhoisQueueItem {
@ -60,11 +60,11 @@ impl WhoisQueue {
}
#[allow(unused)]
fn send_whois<SI: SystemInterface>(&self, node: &Node<SI>, si: &SI, targets: &[Address]) {
fn send_whois<SI: HostSystem>(&self, node: &Node<SI>, si: &SI, targets: &[Address]) {
todo!()
}
pub(crate) fn service<SI: SystemInterface>(&self, si: &SI, node: &Node<SI>, time_ticks: i64) {
pub(crate) fn service<SI: HostSystem>(&self, si: &SI, node: &Node<SI>, time_ticks: i64) {
let mut targets: Vec<Address> = Vec::new();
self.0.lock().retain(|target, qi| {
if qi.retry_count < WHOIS_RETRY_MAX {

View file

@ -2,7 +2,7 @@
use async_trait::async_trait;
use crate::vl1::node::{InnerProtocolInterface, SystemInterface};
use crate::vl1::node::{HostSystem, InnerProtocol};
use crate::vl1::protocol::*;
use crate::vl1::{Identity, Path, Peer};
@ -11,17 +11,23 @@ pub trait SwitchInterface: Sync + Send {}
pub struct Switch {}
#[async_trait]
impl InnerProtocolInterface for Switch {
impl InnerProtocol for Switch {
#[allow(unused)]
async fn handle_packet<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, verb: u8, payload: &PacketBuffer) -> bool {
async fn handle_packet<HostSystemImpl: HostSystem>(
&self,
peer: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
verb: u8,
payload: &PacketBuffer,
) -> bool {
false
}
#[allow(unused)]
async fn handle_error<SI: SystemInterface>(
async fn handle_error<HostSystemImpl: HostSystem>(
&self,
peer: &Peer<SI>,
source_path: &Path<SI>,
peer: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
in_re_verb: u8,
in_re_message_id: u64,
error_code: u8,
@ -32,10 +38,10 @@ impl InnerProtocolInterface for Switch {
}
#[allow(unused)]
async fn handle_ok<SI: SystemInterface>(
async fn handle_ok<HostSystemImpl: HostSystem>(
&self,
peer: &Peer<SI>,
source_path: &Path<SI>,
peer: &Peer<HostSystemImpl>,
source_path: &Path<HostSystemImpl>,
in_re_verb: u8,
in_re_message_id: u64,
payload: &PacketBuffer,
@ -50,8 +56,4 @@ impl InnerProtocolInterface for Switch {
}
}
impl Switch {
pub async fn new() -> Self {
Self {}
}
}
impl Switch {}

View file

@ -1,5 +1,5 @@
[package]
name = "zerotier-system-service"
name = "zerotier-service"
version = "0.1.0"
authors = ["ZeroTier, Inc. <contact@zerotier.com>", "Adam Ierymenko <adam.ierymenko@zerotier.com>"]
edition = "2021"
@ -13,8 +13,8 @@ path = "src/main.rs"
zerotier-network-hypervisor = { path = "../network-hypervisor" }
zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
zerotier-vl1-service = { path = "../vl1-service" }
async-trait = "^0"
num-traits = "^0"
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }

View file

@ -1,312 +0,0 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use zerotier_network_hypervisor::vl1::*;
use zerotier_network_hypervisor::vl2::*;
use zerotier_network_hypervisor::*;
use zerotier_crypto::random;
use zerotier_utils::{ms_monotonic, ms_since_epoch};
use tokio::time::Duration;
use crate::datadir::DataDir;
use crate::localinterface::LocalInterface;
use crate::localsocket::LocalSocket;
use crate::udp::*;
/// Interval between scans of system network interfaces to update port bindings.
const UDP_UPDATE_BINDINGS_INTERVAL_MS: Duration = Duration::from_millis(5000);
/// ZeroTier system service, which presents virtual networks as VPN connections on Windows/macOS/Linux/BSD/etc.
pub struct Service {
udp_binding_task: tokio::task::JoinHandle<()>,
core_background_service_task: tokio::task::JoinHandle<()>,
internal: Arc<ServiceImpl>,
}
struct ServiceImpl {
pub rt: tokio::runtime::Handle,
pub data: DataDir,
pub udp_sockets_by_port: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>,
pub num_listeners_per_socket: usize,
_core: Option<NetworkHypervisor<Self>>,
}
impl Drop for Service {
fn drop(&mut self) {
// Kill all background tasks associated with this service.
self.udp_binding_task.abort();
self.core_background_service_task.abort();
// Drop all bound sockets since these can hold circular Arc<> references to 'internal'.
// This shouldn't have to loop much if at all to acquire the lock, but it might if something
// is still completing somewhere in an aborting task.
loop {
if let Ok(mut udp_sockets) = self.internal.udp_sockets_by_port.try_write() {
udp_sockets.clear();
break;
}
std::thread::sleep(Duration::from_millis(5));
}
}
}
impl Service {
/// Start ZeroTier service.
///
/// This launches a number of background tasks in the async runtime that will run as long as this object exists.
/// When this is dropped these tasks are killed.
pub async fn new<P: AsRef<Path>>(
rt: tokio::runtime::Handle,
base_path: P,
auto_upgrade_identity: bool,
) -> Result<Self, Box<dyn Error>> {
let mut si = ServiceImpl {
rt,
data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?,
udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(4)),
num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(),
_core: None,
};
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity).await?);
let si = Arc::new(si);
si._core.as_ref().unwrap().add_update_default_root_set_if_none();
Ok(Self {
udp_binding_task: si.rt.spawn(si.clone().udp_binding_task_main()),
core_background_service_task: si.rt.spawn(si.clone().core_background_service_task_main()),
internal: si,
})
}
}
impl ServiceImpl {
#[inline(always)]
fn core(&self) -> &NetworkHypervisor<ServiceImpl> {
self._core.as_ref().unwrap()
}
/// Called in udp_binding_task_main() to service a particular UDP port.
async fn update_udp_bindings_for_port(
self: &Arc<Self>,
port: u16,
interface_prefix_blacklist: &Vec<String>,
cidr_blacklist: &Vec<InetAddress>,
) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> {
for ns in {
let mut udp_sockets_by_port = self.udp_sockets_by_port.write().await;
let bp = udp_sockets_by_port.entry(port).or_insert_with(|| BoundUdpPort::new(port));
let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist);
if bp.sockets.is_empty() {
return Some(errors);
}
new_sockets
}
.iter()
{
/*
* Start a task (not actual thread) for each CPU core.
*
* The async runtime is itself multithreaded but each packet takes a little bit of CPU to handle.
* This makes sure that when one packet is in processing the async runtime is immediately able to
* cue up another receiver for this socket.
*/
for _ in 0..self.num_listeners_per_socket {
let self2 = self.clone();
let socket = ns.socket.clone();
let interface = ns.interface.clone();
let local_socket = LocalSocket::new(ns);
ns.socket_associated_tasks.lock().push(self.rt.spawn(async move {
let core = self2.core();
loop {
let mut buf = core.get_packet_buffer();
if let Ok((bytes, source)) = socket.recv_from(unsafe { buf.entire_buffer_mut() }).await {
unsafe { buf.set_size_unchecked(bytes) };
core.handle_incoming_physical_packet(
&self2,
&Endpoint::IpUdp(InetAddress::from(source)),
&local_socket,
&interface,
buf,
)
.await;
} else {
break;
}
}
}));
}
}
return None;
}
/// Background task to update per-interface/per-port bindings if system interface configuration changes.
async fn udp_binding_task_main(self: Arc<Self>) {
loop {
let config = self.data.config().await;
if let Some(errors) = self
.update_udp_bindings_for_port(
config.settings.primary_port,
&config.settings.interface_prefix_blacklist,
&config.settings.cidr_blacklist,
)
.await
{
for e in errors.iter() {
println!("BIND ERROR: {} {} {}", e.0.to_string(), e.1.to_string(), e.2.to_string());
}
// TODO: report errors properly
}
tokio::time::sleep(UDP_UPDATE_BINDINGS_INTERVAL_MS).await;
}
}
/// Periodically calls do_background_tasks() in the ZeroTier core.
async fn core_background_service_task_main(self: Arc<Self>) {
tokio::time::sleep(Duration::from_secs(1)).await;
loop {
tokio::time::sleep(self.core().do_background_tasks(&self).await).await;
}
}
}
#[async_trait]
impl SystemInterface for ServiceImpl {
type LocalSocket = crate::service::LocalSocket;
type LocalInterface = crate::localinterface::LocalInterface;
fn event(&self, event: Event) {
println!("{}", event.to_string());
match event {
_ => {}
}
}
async fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {}
#[inline(always)]
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool {
socket.0.strong_count() > 0
}
async fn load_node_identity(&self) -> Option<Identity> {
self.data.load_identity().await.map_or(None, |i| Some(i))
}
async fn save_node_identity(&self, id: &Identity) {
assert!(self.data.save_identity(id).await.is_ok())
}
async fn wire_send(
&self,
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
data: &[&[u8]],
packet_ttl: u8,
) -> bool {
match endpoint {
Endpoint::IpUdp(address) => {
// This is the fast path -- the socket is known to the core so just send it.
if let Some(s) = local_socket {
if let Some(s) = s.0.upgrade() {
return s.send_sync_nonblock(address, data, packet_ttl);
} else {
return false;
}
}
let udp_sockets_by_port = self.udp_sockets_by_port.read().await;
if !udp_sockets_by_port.is_empty() {
if let Some(specific_interface) = local_interface {
// Send from a specific interface if that interface is specified.
for (_, p) in udp_sockets_by_port.iter() {
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
let s = p.sockets.get(i).unwrap();
if s.interface.eq(specific_interface) {
if s.send_sync_nonblock(address, data, packet_ttl) {
return true;
}
}
i = (i + 1) % p.sockets.len();
}
}
}
} else {
// Otherwise send from one socket on every interface.
let mut sent_on_interfaces = HashSet::with_capacity(4);
for p in udp_sockets_by_port.values() {
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
let s = p.sockets.get(i).unwrap();
if !sent_on_interfaces.contains(&s.interface) {
if s.send_sync_nonblock(address, data, packet_ttl) {
sent_on_interfaces.insert(s.interface.clone());
}
}
i = (i + 1) % p.sockets.len();
}
}
}
return !sent_on_interfaces.is_empty();
}
}
return false;
}
_ => {}
}
return false;
}
async fn check_path(
&self,
_id: &Identity,
endpoint: &Endpoint,
_local_socket: Option<&Self::LocalSocket>,
_local_interface: Option<&Self::LocalInterface>,
) -> bool {
let config = self.data.config().await;
if let Some(pps) = config.physical.get(endpoint) {
!pps.blacklist
} else {
true
}
}
async fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>> {
let config = self.data.config().await;
if let Some(vns) = config.virtual_.get(&id.address) {
Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect())
} else {
None
}
}
#[inline(always)]
fn time_ticks(&self) -> i64 {
ms_monotonic()
}
#[inline(always)]
fn time_clock(&self) -> i64 {
ms_since_epoch()
}
}
impl SwitchInterface for ServiceImpl {}
impl Interface for ServiceImpl {}

View file

@ -7,22 +7,11 @@ use std::sync::{Arc, Weak};
use parking_lot::Mutex;
/// Each pool requires a factory that creates and resets (for re-use) pooled objects.
pub trait PoolFactory<O: Send> {
pub trait PoolFactory<O> {
fn create(&self) -> O;
fn reset(&self, obj: &mut O);
}
#[repr(C)]
struct PoolEntry<O: Send, F: PoolFactory<O>> {
obj: O, // must be first
return_pool: Weak<PoolInner<O, F>>,
}
struct PoolInner<O: Send, F: PoolFactory<O>> {
factory: F,
pool: Mutex<Vec<NonNull<PoolEntry<O, F>>>>,
}
/// Container for pooled objects that have been checked out of the pool.
///
/// When this is dropped the object is returned to the pool or if the pool or is
@ -34,9 +23,15 @@ struct PoolInner<O: Send, F: PoolFactory<O>> {
/// Note that pooled objects are not clonable. If you want to share them use Rc<>
/// or Arc<>.
#[repr(transparent)]
pub struct Pooled<O: Send, F: PoolFactory<O>>(NonNull<PoolEntry<O, F>>);
pub struct Pooled<O, F: PoolFactory<O>>(NonNull<PoolEntry<O, F>>);
impl<O: Send, F: PoolFactory<O>> Pooled<O, F> {
#[repr(C)]
struct PoolEntry<O, F: PoolFactory<O>> {
obj: O, // must be first
return_pool: Weak<PoolInner<O, F>>,
}
impl<O, F: PoolFactory<O>> Pooled<O, F> {
/// Get a raw pointer to the object wrapped by this pooled object container.
/// The returned raw pointer MUST be restored into a Pooled instance with
/// from_raw() or memory will leak.
@ -67,10 +62,10 @@ impl<O: Send, F: PoolFactory<O>> Pooled<O, F> {
}
}
unsafe impl<O: Send, F: PoolFactory<O>> Send for Pooled<O, F> {}
unsafe impl<O: Send, F: PoolFactory<O>> Sync for Pooled<O, F> where O: Sync {}
unsafe impl<O, F: PoolFactory<O>> Send for Pooled<O, F> where O: Send {}
unsafe impl<O, F: PoolFactory<O>> Sync for Pooled<O, F> where O: Sync {}
impl<O: Send, F: PoolFactory<O>> Deref for Pooled<O, F> {
impl<O, F: PoolFactory<O>> Deref for Pooled<O, F> {
type Target = O;
#[inline(always)]
@ -79,29 +74,29 @@ impl<O: Send, F: PoolFactory<O>> Deref for Pooled<O, F> {
}
}
impl<O: Send, F: PoolFactory<O>> DerefMut for Pooled<O, F> {
impl<O, F: PoolFactory<O>> DerefMut for Pooled<O, F> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut self.0.as_mut().obj }
}
}
impl<O: Send, F: PoolFactory<O>> AsRef<O> for Pooled<O, F> {
impl<O, F: PoolFactory<O>> AsRef<O> for Pooled<O, F> {
#[inline(always)]
fn as_ref(&self) -> &O {
unsafe { &self.0.as_ref().obj }
}
}
impl<O: Send, F: PoolFactory<O>> AsMut<O> for Pooled<O, F> {
impl<O, F: PoolFactory<O>> AsMut<O> for Pooled<O, F> {
#[inline(always)]
fn as_mut(&mut self) -> &mut O {
unsafe { &mut self.0.as_mut().obj }
}
}
impl<O: Send, F: PoolFactory<O>> Drop for Pooled<O, F> {
#[inline(always)]
impl<O, F: PoolFactory<O>> Drop for Pooled<O, F> {
#[inline]
fn drop(&mut self) {
let internal = unsafe { self.0.as_mut() };
if let Some(p) = internal.return_pool.upgrade() {
@ -116,9 +111,14 @@ impl<O: Send, F: PoolFactory<O>> Drop for Pooled<O, F> {
/// An object pool for Reusable objects.
/// Checked out objects are held by a guard object that returns them when dropped if
/// the pool still exists or drops them if the pool has itself been dropped.
pub struct Pool<O: Send, F: PoolFactory<O>>(Arc<PoolInner<O, F>>);
pub struct Pool<O, F: PoolFactory<O>>(Arc<PoolInner<O, F>>);
impl<O: Send, F: PoolFactory<O>> Pool<O, F> {
struct PoolInner<O, F: PoolFactory<O>> {
factory: F,
pool: Mutex<Vec<NonNull<PoolEntry<O, F>>>>,
}
impl<O, F: PoolFactory<O>> Pool<O, F> {
pub fn new(initial_stack_capacity: usize, factory: F) -> Self {
Self(Arc::new(PoolInner::<O, F> {
factory,
@ -127,7 +127,7 @@ impl<O: Send, F: PoolFactory<O>> Pool<O, F> {
}
/// Get a pooled object, or allocate one if the pool is empty.
#[inline(always)]
#[inline]
pub fn get(&self) -> Pooled<O, F> {
if let Some(o) = self.0.pool.lock().pop() {
return Pooled::<O, F>(o);
@ -152,7 +152,7 @@ impl<O: Send, F: PoolFactory<O>> Pool<O, F> {
}
}
impl<O: Send, F: PoolFactory<O>> Drop for Pool<O, F> {
impl<O, F: PoolFactory<O>> Drop for Pool<O, F> {
#[inline(always)]
fn drop(&mut self) {
self.purge();

21
vl1-service/Cargo.toml Normal file
View file

@ -0,0 +1,21 @@
[package]
name = "zerotier-vl1-service"
version = "0.1.0"
authors = ["ZeroTier, Inc. <contact@zerotier.com>", "Adam Ierymenko <adam.ierymenko@zerotier.com>"]
edition = "2021"
license = "MPL-2.0"
[dependencies]
zerotier-network-hypervisor = { path = "../network-hypervisor" }
zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
async-trait = "^0"
num-traits = "^0"
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
[target."cfg(windows)".dependencies]
winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }
[target."cfg(not(windows))".dependencies]
libc = "^0"

1
vl1-service/rustfmt.toml Symbolic link
View file

@ -0,0 +1 @@
../rustfmt.toml

11
vl1-service/src/lib.rs Normal file
View file

@ -0,0 +1,11 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
mod localinterface;
mod localsocket;
mod vl1service;
pub mod sys;
pub use localinterface::LocalInterface;
pub use localsocket::LocalSocket;
pub use vl1service::*;

View file

@ -4,7 +4,7 @@ use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use crate::udp::BoundUdpSocket;
use crate::sys::udp::BoundUdpSocket;
static LOCAL_SOCKET_UNIQUE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
@ -22,10 +22,9 @@ impl LocalSocket {
Self(Arc::downgrade(s), LOCAL_SOCKET_UNIQUE_ID_COUNTER.fetch_add(1, Ordering::SeqCst))
}
/// Returns true if the wrapped socket appears to be in use by the core.
#[inline(always)]
pub fn in_use(&self) -> bool {
self.0.weak_count() > 0
pub fn is_valid(&self) -> bool {
self.0.strong_count() > 0
}
#[inline(always)]

View file

@ -89,15 +89,12 @@ pub fn for_each_address<F: FnMut(&InetAddress, &LocalInterface)>(mut f: F) {
#[cfg(test)]
mod tests {
use crate::localinterface::LocalInterface;
use zerotier_network_hypervisor::vl1::InetAddress;
use super::*;
#[test]
fn test_getifaddrs() {
println!("starting getifaddrs...");
crate::getifaddrs::for_each_address(|a: &InetAddress, interface: &LocalInterface| {
println!(" {} {}", interface.to_string(), a.to_string())
});
for_each_address(|a: &InetAddress, interface: &LocalInterface| println!(" {} {}", interface.to_string(), a.to_string()));
println!("done.")
}
}

View file

@ -0,0 +1,5 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
pub mod getifaddrs;
pub mod ipv6;
pub mod udp;

View file

@ -12,8 +12,6 @@ use std::sync::Arc;
#[cfg(unix)]
use std::os::unix::io::{FromRawFd, RawFd};
use crate::getifaddrs;
use crate::ipv6;
use crate::localinterface::LocalInterface;
#[allow(unused_imports)]
@ -21,6 +19,8 @@ use num_traits::AsPrimitive;
use zerotier_network_hypervisor::vl1::inetaddress::*;
use crate::sys::{getifaddrs, ipv6};
/// A local port to which one or more UDP sockets is bound.
///
/// To bind a port we must bind sockets to each interface/IP pair directly. Sockets must
@ -35,18 +35,9 @@ pub struct BoundUdpSocket {
pub address: InetAddress,
pub socket: Arc<tokio::net::UdpSocket>,
pub interface: LocalInterface,
pub socket_associated_tasks: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
fd: RawFd,
}
impl Drop for BoundUdpSocket {
fn drop(&mut self) {
for t in self.socket_associated_tasks.lock().drain(..) {
t.abort();
}
}
}
impl BoundUdpSocket {
#[cfg(unix)]
#[inline(always)]
@ -64,43 +55,21 @@ impl BoundUdpSocket {
}
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool {
let mut ok = false;
if dest.family() == self.address.family() {
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(packet_ttl);
}
unsafe {
if b.len() == 1 {
let bb = *b.get_unchecked(0);
ok = libc::sendto(
self.fd.as_(),
bb.as_ptr().cast(),
bb.len().as_(),
0,
(dest as *const InetAddress).cast(),
size_of::<InetAddress>().as_(),
) > 0;
} else {
let mut iov: [libc::iovec; 16] = MaybeUninit::uninit().assume_init();
assert!(b.len() <= iov.len());
for i in 0..b.len() {
let bb = *b.get_unchecked(i);
let ii = iov.get_unchecked_mut(i);
ii.iov_base = transmute(bb.as_ptr());
ii.iov_len = bb.len().as_();
}
let msghdr = libc::msghdr {
msg_name: transmute(dest as *const InetAddress),
msg_namelen: size_of::<InetAddress>().as_(),
msg_iov: iov.as_mut_ptr(),
msg_iovlen: b.len().as_(),
msg_control: null_mut(),
msg_controllen: 0,
msg_flags: 0,
};
ok = libc::sendmsg(self.fd.as_(), &msghdr, 0) > 0;
}
ok = libc::sendto(
self.fd.as_(),
b.as_ptr().cast(),
b.len().as_(),
0,
(dest as *const InetAddress).cast(),
size_of::<InetAddress>().as_(),
) > 0;
}
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(0xff);
@ -110,32 +79,15 @@ impl BoundUdpSocket {
}
#[cfg(not(any(target_os = "macos", target_os = "freebsd")))]
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool {
let mut ok = false;
if dest.family() == self.address.family() {
let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() };
let data = if b.len() == 1 {
*unsafe { b.get_unchecked(0) }
} else {
let mut p = 0;
for bb in b.iter() {
let pp = p + bb.len();
if pp < 16384 {
tmp[p..pp].copy_from_slice(*bb);
p = pp;
} else {
return false;
}
}
&tmp[..p]
};
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(packet_ttl);
ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok();
ok = self.socket.try_send_to(b, dest.try_into().unwrap()).is_ok();
self.set_ttl(0xff);
} else {
ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok();
ok = self.socket.try_send_to(b, dest.try_into().unwrap()).is_ok();
}
}
ok
@ -202,7 +154,6 @@ impl BoundUdpPort {
let s = Arc::new(BoundUdpSocket {
address: addr_with_port,
socket: Arc::new(s.unwrap()),
socket_associated_tasks: parking_lot::Mutex::new(Vec::new()),
interface: interface.clone(),
fd,
});

View file

@ -0,0 +1,191 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::Arc;
use async_trait::async_trait;
use zerotier_crypto::random;
use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InnerProtocol, Node, PathFilter, Storage};
use zerotier_utils::{ms_monotonic, ms_since_epoch};
use crate::sys::udp::BoundUdpPort;
use tokio::task::JoinHandle;
use tokio::time::Duration;
/// VL1 service that connects to the physical network and hosts an inner protocol like ZeroTier VL2.
///
/// This is the "outward facing" half of a full ZeroTier stack on a normal system. It binds sockets,
/// talks to the physical network, manages the vl1 node, and presents a templated interface for
/// whatever inner protocol implementation is using it. This would typically be VL2 but could be
/// a test harness or just the controller for a controller that runs stand-alone.
pub struct VL1Service<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl: InnerProtocol> {
daemons: parking_lot::Mutex<Vec<JoinHandle<()>>>,
udp_sockets_by_port: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>,
storage: Arc<StorageImpl>,
inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
node_container: Option<Node<Self>>,
}
impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl: InnerProtocol>
VL1Service<StorageImpl, PathFilterImpl, InnerProtocolImpl>
{
pub async fn new(
storage: Arc<StorageImpl>,
inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
) -> Result<Arc<Self>, Box<dyn Error>> {
let mut service = VL1Service {
daemons: parking_lot::Mutex::new(Vec::with_capacity(2)),
udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(8)),
storage,
inner,
path_filter,
node_container: None,
};
service
.node_container
.replace(Node::new(&service, &*service.storage, true, false).await?);
let service = Arc::new(service);
let mut daemons = service.daemons.lock();
daemons.push(tokio::spawn(service.clone().udp_bind_daemon()));
daemons.push(tokio::spawn(service.clone().node_background_task_daemon()));
drop(daemons);
Ok(service)
}
#[inline(always)]
pub fn node(&self) -> &Node<Self> {
debug_assert!(self.node_container.is_some());
unsafe { self.node_container.as_ref().unwrap_unchecked() }
}
async fn udp_bind_daemon(self: Arc<Self>) {}
async fn node_background_task_daemon(self: Arc<Self>) {}
}
#[async_trait]
impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl: InnerProtocol> HostSystem
for VL1Service<StorageImpl, PathFilterImpl, InnerProtocolImpl>
{
type LocalSocket = crate::LocalSocket;
type LocalInterface = crate::LocalInterface;
fn event(&self, event: Event) {
println!("{}", event.to_string());
match event {
_ => {}
}
}
async fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {}
#[inline(always)]
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool {
socket.is_valid()
}
async fn wire_send(
&self,
endpoint: &Endpoint,
local_socket: Option<&Self::LocalSocket>,
local_interface: Option<&Self::LocalInterface>,
data: &[u8],
packet_ttl: u8,
) -> bool {
match endpoint {
Endpoint::IpUdp(address) => {
// This is the fast path -- the socket is known to the core so just send it.
if let Some(s) = local_socket {
if let Some(s) = s.0.upgrade() {
return s.send_sync_nonblock(address, data, packet_ttl);
} else {
return false;
}
}
let udp_sockets_by_port = self.udp_sockets_by_port.read().await;
if !udp_sockets_by_port.is_empty() {
if let Some(specific_interface) = local_interface {
// Send from a specific interface if that interface is specified.
for (_, p) in udp_sockets_by_port.iter() {
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
let s = p.sockets.get(i).unwrap();
if s.interface.eq(specific_interface) {
if s.send_sync_nonblock(address, data, packet_ttl) {
return true;
}
}
i = (i + 1) % p.sockets.len();
}
}
}
} else {
// Otherwise send from one socket on every interface.
let mut sent_on_interfaces = HashSet::with_capacity(4);
for p in udp_sockets_by_port.values() {
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
let s = p.sockets.get(i).unwrap();
if !sent_on_interfaces.contains(&s.interface) {
if s.send_sync_nonblock(address, data, packet_ttl) {
sent_on_interfaces.insert(s.interface.clone());
}
}
i = (i + 1) % p.sockets.len();
}
}
}
return !sent_on_interfaces.is_empty();
}
}
return false;
}
_ => {}
}
return false;
}
#[inline(always)]
fn time_ticks(&self) -> i64 {
ms_monotonic()
}
#[inline(always)]
fn time_clock(&self) -> i64 {
ms_since_epoch()
}
}
impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl: InnerProtocol> Drop
for VL1Service<StorageImpl, PathFilterImpl, InnerProtocolImpl>
{
fn drop(&mut self) {
for d in self.daemons.lock().drain(..) {
d.abort();
}
// Drop all bound sockets since these can hold circular Arc<> references to 'internal'.
// This shouldn't have to loop much if at all to acquire the lock, but it might if something
// is still completing somewhere in an aborting task.
loop {
if let Ok(mut udp_sockets) = self.udp_sockets_by_port.try_write() {
udp_sockets.clear();
break;
}
std::thread::sleep(Duration::from_millis(2));
}
}
}