mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-07-26 04:02:50 +02:00
Infect the core with async. Was resisting it a bit but it makes things easier and will probably perform better than using hacks to avoid it. Optional for UDP but we will end up wanting this when we have TCP, HTTP, WebRTC, etc. transports.
This commit is contained in:
parent
6940b97a92
commit
6ad30ba1f5
13 changed files with 501 additions and 377 deletions
|
@ -51,6 +51,8 @@ impl Write for SHA512 {
|
|||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for SHA512 {}
|
||||
|
||||
pub struct SHA384(Option<openssl::sha::Sha384>);
|
||||
|
||||
impl SHA384 {
|
||||
|
@ -93,6 +95,8 @@ impl Write for SHA384 {
|
|||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for SHA384 {}
|
||||
|
||||
//#[link(name="crypto")]
|
||||
extern "C" {
|
||||
fn HMAC_CTX_new() -> *mut c_void;
|
||||
|
|
|
@ -26,3 +26,5 @@ impl Poly1305 {
|
|||
self.0.finalize().into_bytes().into()
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for Poly1305 {}
|
||||
|
|
|
@ -17,9 +17,9 @@ debug_events = []
|
|||
|
||||
[dependencies]
|
||||
zerotier-core-crypto = { path = "../zerotier-core-crypto" }
|
||||
async-trait = "^0"
|
||||
base64 = "^0"
|
||||
lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] }
|
||||
dashmap = "^5"
|
||||
parking_lot = "^0"
|
||||
lazy_static = "^1"
|
||||
serde = { version = "^1", features = ["derive"], default-features = false }
|
||||
|
|
|
@ -15,3 +15,5 @@ mod networkhypervisor;
|
|||
pub use event::Event;
|
||||
pub use networkhypervisor::{Interface, NetworkHypervisor};
|
||||
pub use vl1::protocol::{PacketBuffer, PooledPacketBuffer};
|
||||
|
||||
pub use async_trait::async_trait;
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::error::InvalidParameterError;
|
||||
use crate::util::buffer::Buffer;
|
||||
use crate::util::marshalable::Marshalable;
|
||||
|
@ -10,6 +12,7 @@ use crate::vl1::protocol::PooledPacketBuffer;
|
|||
use crate::vl1::*;
|
||||
use crate::vl2::switch::*;
|
||||
|
||||
#[async_trait]
|
||||
pub trait Interface: SystemInterface + SwitchInterface {}
|
||||
|
||||
pub struct NetworkHypervisor<I: Interface> {
|
||||
|
@ -18,10 +21,10 @@ pub struct NetworkHypervisor<I: Interface> {
|
|||
}
|
||||
|
||||
impl<I: Interface> NetworkHypervisor<I> {
|
||||
pub fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
|
||||
pub async fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
|
||||
Ok(NetworkHypervisor {
|
||||
vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity)?,
|
||||
vl2: Switch::new(),
|
||||
vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity).await?,
|
||||
vl2: Switch::new().await,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -45,13 +48,14 @@ impl<I: Interface> NetworkHypervisor<I> {
|
|||
/// This shouldn't be called concurrently by more than one loop. Doing so would be harmless
|
||||
/// but would be a waste of compute cycles.
|
||||
#[inline(always)]
|
||||
pub fn do_background_tasks(&self, ii: &I) -> Duration {
|
||||
self.vl1.do_background_tasks(ii)
|
||||
pub async fn do_background_tasks(&self, ii: &I) -> Duration {
|
||||
self.vl1.do_background_tasks(ii).await
|
||||
}
|
||||
|
||||
/// Process a physical packet received over a network interface.
|
||||
#[inline(always)]
|
||||
pub fn handle_incoming_physical_packet(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) {
|
||||
self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data)
|
||||
pub async fn handle_incoming_physical_packet(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) {
|
||||
self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data).await
|
||||
}
|
||||
|
||||
/// Add or update a root set.
|
||||
|
|
|
@ -14,7 +14,7 @@ pub(crate) const ZEROES: [u8; 64] = [0_u8; 64];
|
|||
#[allow(unused_macros)]
|
||||
macro_rules! debug_event {
|
||||
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {
|
||||
$si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?)));
|
||||
$si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,18 +1,18 @@
|
|||
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::hash::Hash;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use async_trait::async_trait;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
|
||||
use crate::error::InvalidParameterError;
|
||||
use crate::util::debug_event;
|
||||
use crate::util::gate::IntervalGate;
|
||||
use crate::vl1::path::Path;
|
||||
use crate::vl1::path::{Path, PathServiceResult};
|
||||
use crate::vl1::peer::Peer;
|
||||
use crate::vl1::protocol::*;
|
||||
use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue};
|
||||
|
@ -23,6 +23,7 @@ use crate::Event;
|
|||
///
|
||||
/// These methods are basically callbacks that the core calls to request or transmit things. They are called
|
||||
/// during calls to things like wire_recieve() and do_background_tasks().
|
||||
#[async_trait]
|
||||
pub trait SystemInterface: Sync + Send + 'static {
|
||||
/// Type for local system sockets.
|
||||
type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
|
||||
|
@ -31,10 +32,10 @@ pub trait SystemInterface: Sync + Send + 'static {
|
|||
type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
|
||||
|
||||
/// An event occurred.
|
||||
fn event(&self, event: Event);
|
||||
async fn event(&self, event: Event);
|
||||
|
||||
/// A USER_MESSAGE packet was received.
|
||||
fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]);
|
||||
async fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]);
|
||||
|
||||
/// Check a local socket for validity.
|
||||
///
|
||||
|
@ -43,10 +44,10 @@ pub trait SystemInterface: Sync + Send + 'static {
|
|||
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool;
|
||||
|
||||
/// Load this node's identity from the data store.
|
||||
fn load_node_identity(&self) -> Option<Identity>;
|
||||
async fn load_node_identity(&self) -> Option<Identity>;
|
||||
|
||||
/// Save this node's identity.
|
||||
fn save_node_identity(&self, id: &Identity);
|
||||
async fn save_node_identity(&self, id: &Identity);
|
||||
|
||||
/// Called to send a packet over the physical network (virtual -> physical).
|
||||
///
|
||||
|
@ -61,13 +62,13 @@ pub trait SystemInterface: Sync + Send + 'static {
|
|||
/// For endpoint types that support a packet TTL, the implementation may set the TTL
|
||||
/// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not
|
||||
/// supported, the default TTL should be used.
|
||||
fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool;
|
||||
async fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool;
|
||||
|
||||
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
|
||||
fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool;
|
||||
async fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool;
|
||||
|
||||
/// Called to look up any statically defined or memorized paths to known nodes.
|
||||
fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>>;
|
||||
async fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>>;
|
||||
|
||||
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
|
||||
/// This needs to be accurate to about 250 milliseconds resolution or better.
|
||||
|
@ -82,19 +83,31 @@ pub trait SystemInterface: Sync + Send + 'static {
|
|||
///
|
||||
/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
|
||||
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
|
||||
#[async_trait]
|
||||
pub trait InnerProtocolInterface: Sync + Send + 'static {
|
||||
/// Handle a packet, returning true if it was handled by the next layer.
|
||||
///
|
||||
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
|
||||
/// The return values of these must follow the same semantic of returning true if the message
|
||||
/// was handled.
|
||||
fn handle_packet<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool;
|
||||
async fn handle_packet<SI: SystemInterface>(&self, source: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool;
|
||||
|
||||
/// Handle errors, returning true if the error was recognized.
|
||||
fn handle_error<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, error_code: u8, payload: &PacketBuffer, cursor: &mut usize) -> bool;
|
||||
async fn handle_error<SI: SystemInterface>(
|
||||
&self,
|
||||
source: &Peer<SI>,
|
||||
source_path: &Path<SI>,
|
||||
forward_secrecy: bool,
|
||||
extended_authentication: bool,
|
||||
in_re_verb: u8,
|
||||
in_re_message_id: u64,
|
||||
error_code: u8,
|
||||
payload: &PacketBuffer,
|
||||
cursor: &mut usize,
|
||||
) -> bool;
|
||||
|
||||
/// Handle an OK, returing true if the OK was recognized.
|
||||
fn handle_ok<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool;
|
||||
async fn handle_ok<SI: SystemInterface>(&self, source: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool;
|
||||
|
||||
/// Check if this remote peer has a trust relationship with this node.
|
||||
///
|
||||
|
@ -108,11 +121,11 @@ const ROOT_SYNC_INTERVAL_MS: i64 = 1000;
|
|||
|
||||
#[derive(Default)]
|
||||
struct BackgroundTaskIntervals {
|
||||
whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>,
|
||||
paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>,
|
||||
peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>,
|
||||
root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>,
|
||||
root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>,
|
||||
peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>,
|
||||
paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>,
|
||||
whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>,
|
||||
}
|
||||
|
||||
struct RootInfo<SI: SystemInterface> {
|
||||
|
@ -121,6 +134,57 @@ struct RootInfo<SI: SystemInterface> {
|
|||
sets_modified: bool,
|
||||
}
|
||||
|
||||
enum PathKey<'a, SI: SystemInterface> {
|
||||
Copied(Endpoint, SI::LocalSocket),
|
||||
Ref(&'a Endpoint, &'a SI::LocalSocket),
|
||||
}
|
||||
|
||||
impl<'a, SI: SystemInterface> Hash for PathKey<'a, SI> {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
match self {
|
||||
Self::Copied(ep, ls) => {
|
||||
ep.hash(state);
|
||||
ls.hash(state);
|
||||
}
|
||||
Self::Ref(ep, ls) => {
|
||||
(*ep).hash(state);
|
||||
(*ls).hash(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, SI: SystemInterface> PartialEq for PathKey<'_, SI> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2),
|
||||
(Self::Copied(ep1, ls1), Self::Ref(ep2, ls2)) => ep1.eq(*ep2) && ls1.eq(*ls2),
|
||||
(Self::Ref(ep1, ls1), Self::Copied(ep2, ls2)) => (*ep1).eq(ep2) && (*ls1).eq(ls2),
|
||||
(Self::Ref(ep1, ls1), Self::Ref(ep2, ls2)) => (*ep1).eq(*ep2) && (*ls1).eq(*ls2),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, SI: SystemInterface> Eq for PathKey<'_, SI> {}
|
||||
|
||||
impl<'a, SI: SystemInterface> PathKey<'a, SI> {
|
||||
#[inline(always)]
|
||||
fn local_socket(&self) -> &SI::LocalSocket {
|
||||
match self {
|
||||
Self::Copied(_, ls) => ls,
|
||||
Self::Ref(_, ls) => *ls,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn to_copied(&self) -> PathKey<'static, SI> {
|
||||
match self {
|
||||
Self::Copied(ep, ls) => PathKey::<'static, SI>::Copied(ep.clone(), ls.clone()),
|
||||
Self::Ref(ep, ls) => PathKey::<'static, SI>::Copied((*ep).clone(), (*ls).clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A VL1 global P2P network node.
|
||||
pub struct Node<SI: SystemInterface> {
|
||||
/// A random ID generated to identify this particular running instance.
|
||||
|
@ -133,13 +197,13 @@ pub struct Node<SI: SystemInterface> {
|
|||
intervals: Mutex<BackgroundTaskIntervals>,
|
||||
|
||||
/// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use.
|
||||
paths: DashMap<Endpoint, parking_lot::RwLock<HashMap<SI::LocalSocket, Arc<Path<SI>>>>>,
|
||||
paths: parking_lot::RwLock<HashMap<PathKey<'static, SI>, Arc<Path<SI>>>>,
|
||||
|
||||
/// Peers with which we are currently communicating.
|
||||
peers: DashMap<Address, Arc<Peer<SI>>>,
|
||||
peers: parking_lot::RwLock<HashMap<Address, Arc<Peer<SI>>>>,
|
||||
|
||||
/// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions.
|
||||
roots: Mutex<RootInfo<SI>>,
|
||||
roots: RwLock<RootInfo<SI>>,
|
||||
|
||||
/// Current best root.
|
||||
best_root: RwLock<Option<Arc<Peer<SI>>>>,
|
||||
|
@ -152,17 +216,16 @@ pub struct Node<SI: SystemInterface> {
|
|||
}
|
||||
|
||||
impl<SI: SystemInterface> Node<SI> {
|
||||
/// Create a new Node.
|
||||
pub fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
|
||||
pub async fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
|
||||
let mut id = {
|
||||
let id = si.load_node_identity();
|
||||
let id = si.load_node_identity().await;
|
||||
if id.is_none() {
|
||||
if !auto_generate_identity {
|
||||
return Err(InvalidParameterError("no identity found and auto-generate not enabled"));
|
||||
} else {
|
||||
let id = Identity::generate();
|
||||
si.event(Event::IdentityAutoGenerated(id.clone()));
|
||||
si.save_node_identity(&id);
|
||||
si.event(Event::IdentityAutoGenerated(id.clone())).await;
|
||||
si.save_node_identity(&id).await;
|
||||
id
|
||||
}
|
||||
} else {
|
||||
|
@ -173,8 +236,8 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
if auto_upgrade_identity {
|
||||
let old = id.clone();
|
||||
if id.upgrade()? {
|
||||
si.save_node_identity(&id);
|
||||
si.event(Event::IdentityAutoUpgraded(old, id.clone()));
|
||||
si.save_node_identity(&id).await;
|
||||
si.event(Event::IdentityAutoUpgraded(old, id.clone())).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -182,9 +245,9 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
instance_id: zerotier_core_crypto::random::get_bytes_secure(),
|
||||
identity: id,
|
||||
intervals: Mutex::new(BackgroundTaskIntervals::default()),
|
||||
paths: DashMap::new(),
|
||||
peers: DashMap::new(),
|
||||
roots: Mutex::new(RootInfo {
|
||||
paths: parking_lot::RwLock::new(HashMap::new()),
|
||||
peers: parking_lot::RwLock::new(HashMap::new()),
|
||||
roots: RwLock::new(RootInfo {
|
||||
roots: HashMap::new(),
|
||||
sets: HashMap::new(),
|
||||
sets_modified: false,
|
||||
|
@ -201,139 +264,177 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
}
|
||||
|
||||
pub fn peer(&self, a: Address) -> Option<Arc<Peer<SI>>> {
|
||||
self.peers.get(&a).map(|peer| peer.value().clone())
|
||||
self.peers.read().get(&a).cloned()
|
||||
}
|
||||
|
||||
pub fn do_background_tasks(&self, si: &SI) -> Duration {
|
||||
let mut intervals = self.intervals.lock();
|
||||
pub async fn do_background_tasks(&self, si: &SI) -> Duration {
|
||||
let tt = si.time_ticks();
|
||||
let (root_sync, root_hello, peer_check, path_check, whois_check) = {
|
||||
let mut intervals = self.intervals.lock();
|
||||
(intervals.root_sync.gate(tt), intervals.root_hello.gate(tt), intervals.peers.gate(tt), intervals.paths.gate(tt), intervals.whois.gate(tt))
|
||||
};
|
||||
|
||||
assert!(ROOT_SYNC_INTERVAL_MS <= (ROOT_HELLO_INTERVAL / 2));
|
||||
if intervals.root_sync.gate(tt) {
|
||||
match &mut (*self.roots.lock()) {
|
||||
RootInfo { roots, sets, sets_modified } => {
|
||||
// Update internal data structures if the root set configuration has changed.
|
||||
if *sets_modified {
|
||||
*sets_modified = false;
|
||||
debug_event!(si, "root sets modified, synchronizing internal data structures");
|
||||
if root_sync {
|
||||
if {
|
||||
let mut roots = self.roots.write();
|
||||
if roots.sets_modified {
|
||||
roots.sets_modified = false;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} {
|
||||
let (mut old_root_identities, address_collisions, new_roots, bad_identities) = {
|
||||
let roots = self.roots.read();
|
||||
|
||||
let mut old_root_identities: Vec<Identity> = roots.drain().map(|r| r.0.identity.clone()).collect();
|
||||
let mut new_root_identities = Vec::new();
|
||||
let old_root_identities: Vec<Identity> = roots.roots.iter().map(|(p, _)| p.identity.clone()).collect();
|
||||
let mut new_roots = HashMap::new();
|
||||
let mut bad_identities = Vec::new();
|
||||
|
||||
let mut colliding_root_addresses = Vec::new(); // see security note below
|
||||
for (_, rc) in sets.iter() {
|
||||
// This is a sanity check to make sure we don't have root sets that contain roots with the same address
|
||||
// but a different identity. If we do, the offending address is blacklisted. This would indicate something
|
||||
// weird and possibly nasty happening with whomever is making your root set definitions.
|
||||
let mut address_collisions = Vec::new();
|
||||
{
|
||||
let mut address_collision_check = HashMap::with_capacity(roots.sets.len() * 8);
|
||||
for (_, rc) in roots.sets.iter() {
|
||||
for m in rc.members.iter() {
|
||||
if m.endpoints.is_some() && !colliding_root_addresses.contains(&m.identity.address) {
|
||||
/*
|
||||
* SECURITY NOTE: it should be impossible to get an address/identity collision here unless
|
||||
* the user adds a maliciously crafted root set with an identity that collides another. Under
|
||||
* normal circumstances the root backplane combined with the address PoW should rule this
|
||||
* out. However since we trust roots as identity lookup authorities it's important to take
|
||||
* extra care to check for this case. If it's detected, all roots with the offending
|
||||
* address are ignored/disabled.
|
||||
*
|
||||
* The apparently over-thought functional chain here on peers.entry() is to make access to
|
||||
* the peer map atomic since we use a "lock-free" data structure here (DashMap).
|
||||
*/
|
||||
|
||||
let _ = self
|
||||
.peers
|
||||
.entry(m.identity.address)
|
||||
.or_try_insert_with(|| Peer::<SI>::new(&self.identity, m.identity.clone(), si.time_clock()).map_or(Err(crate::error::UnexpectedError), |new_root| Ok(Arc::new(new_root))))
|
||||
.and_then(|root_peer_entry| {
|
||||
let rp = root_peer_entry.value();
|
||||
if rp.identity.eq(&m.identity) {
|
||||
Ok(root_peer_entry)
|
||||
} else {
|
||||
colliding_root_addresses.push(m.identity.address);
|
||||
si.event(Event::SecurityWarning(format!(
|
||||
"address/identity collision between root {} (from root cluster definition '{}') and known peer {}, ignoring this root!",
|
||||
m.identity.address.to_string(),
|
||||
rc.name,
|
||||
rp.identity.to_string()
|
||||
)));
|
||||
Err(crate::error::UnexpectedError)
|
||||
}
|
||||
})
|
||||
.map(|r| {
|
||||
new_root_identities.push(r.value().identity.clone());
|
||||
roots.insert(r.value().clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
|
||||
});
|
||||
if self.peers.read().get(&m.identity.address).map_or(false, |p| !p.identity.eq(&m.identity)) || address_collision_check.insert(m.identity.address, &m.identity).map_or(false, |old_id| !old_id.eq(&m.identity)) {
|
||||
address_collisions.push(m.identity.address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
old_root_identities.sort_unstable();
|
||||
new_root_identities.sort_unstable();
|
||||
if !old_root_identities.eq(&new_root_identities) {
|
||||
si.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
|
||||
}
|
||||
}
|
||||
|
||||
// Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint
|
||||
// they have, which is a behavior that differs from normal peers. This allows roots to
|
||||
// e.g. see our IPv4 and our IPv6 address which can be important for us to learn our
|
||||
// external addresses from them.
|
||||
if intervals.root_hello.gate(tt) {
|
||||
for (root, endpoints) in roots.iter() {
|
||||
for ep in endpoints.iter() {
|
||||
debug_event!(si, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL);
|
||||
root.send_hello(si, self, Some(ep));
|
||||
for (_, rc) in roots.sets.iter() {
|
||||
for m in rc.members.iter() {
|
||||
if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) {
|
||||
let peers = self.peers.upgradable_read();
|
||||
if let Some(peer) = peers.get(&m.identity.address) {
|
||||
new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
|
||||
} else {
|
||||
if let Some(peer) = Peer::<SI>::new(&self.identity, m.identity.clone(), si.time_clock()) {
|
||||
new_roots.insert(parking_lot::RwLockUpgradableReadGuard::upgrade(peers).entry(m.identity.address).or_insert_with(|| Arc::new(peer)).clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
|
||||
} else {
|
||||
bad_identities.push(m.identity.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
|
||||
// this is a proxy for latency and also causes roots that fail to reply to drop out quickly.
|
||||
let mut latest_hello_reply = 0;
|
||||
let mut best: Option<&Arc<Peer<SI>>> = None;
|
||||
for (r, _) in roots.iter() {
|
||||
let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed);
|
||||
if t >= latest_hello_reply {
|
||||
latest_hello_reply = t;
|
||||
let _ = best.insert(r);
|
||||
(old_root_identities, address_collisions, new_roots, bad_identities)
|
||||
};
|
||||
|
||||
for c in address_collisions.iter() {
|
||||
si.event(Event::SecurityWarning(format!("address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string()))).await;
|
||||
}
|
||||
for i in bad_identities.iter() {
|
||||
si.event(Event::SecurityWarning(format!("bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string()))).await;
|
||||
}
|
||||
|
||||
let mut new_root_identities: Vec<Identity> = new_roots.iter().map(|(p, _)| p.identity.clone()).collect();
|
||||
|
||||
old_root_identities.sort_unstable();
|
||||
new_root_identities.sort_unstable();
|
||||
if !old_root_identities.eq(&new_root_identities) {
|
||||
let mut best: Option<Arc<Peer<SI>>> = None;
|
||||
|
||||
{
|
||||
let mut roots = self.roots.write();
|
||||
roots.roots = new_roots;
|
||||
|
||||
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
|
||||
// this is a proxy for latency and also causes roots that fail to reply to drop out quickly.
|
||||
let mut latest_hello_reply = 0;
|
||||
for (r, _) in roots.roots.iter() {
|
||||
let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed);
|
||||
if t >= latest_hello_reply {
|
||||
latest_hello_reply = t;
|
||||
let _ = best.insert(r.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
debug_event!(si, "new best root: {}", best.clone().map_or("none".into(), |p| p.identity.address.to_string()));
|
||||
*(self.best_root.write()) = best.cloned();
|
||||
|
||||
*(self.best_root.write()) = best;
|
||||
|
||||
//debug_event!(si, "new best root: {}", best.as_ref().map_or("none".into(), |p| p.identity.address.to_string()));
|
||||
//si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if intervals.peers.gate(tt) {
|
||||
// Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint
|
||||
// they have, which is a behavior that differs from normal peers. This allows roots to
|
||||
// e.g. see our IPv4 and our IPv6 address which can be important for us to learn our
|
||||
// external addresses from them.
|
||||
if root_hello {
|
||||
let roots = {
|
||||
let roots = self.roots.read();
|
||||
let mut roots_copy = Vec::with_capacity(roots.roots.len());
|
||||
for (root, endpoints) in roots.roots.iter() {
|
||||
roots_copy.push((root.clone(), endpoints.clone()));
|
||||
}
|
||||
roots_copy
|
||||
};
|
||||
for (root, endpoints) in roots.iter() {
|
||||
for ep in endpoints.iter() {
|
||||
debug_event!(si, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL);
|
||||
root.send_hello(si, self, Some(ep)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if peer_check {
|
||||
// Service all peers, removing any whose service() method returns false AND that are not
|
||||
// roots. Roots on the other hand remain in the peer list as long as they are roots.
|
||||
self.peers.retain(|_, peer| if peer.service(si, self, tt) { true } else { !self.roots.lock().roots.contains_key(peer) });
|
||||
}
|
||||
|
||||
if intervals.paths.gate(tt) {
|
||||
// Service all paths, removing expired or invalid ones. This is done in two passes to
|
||||
// avoid introducing latency into a flow.
|
||||
self.paths.retain(|_, pbs| {
|
||||
let mut expired_paths = Vec::new();
|
||||
for (ls, path) in pbs.read().iter() {
|
||||
if !si.local_socket_is_valid(ls) || !path.service(si, self, tt) {
|
||||
expired_paths.push(Arc::as_ptr(path));
|
||||
let mut dead_peers = Vec::new();
|
||||
{
|
||||
let roots = self.roots.read();
|
||||
for (a, peer) in self.peers.read().iter() {
|
||||
if !peer.service(si, self, tt) && !roots.roots.contains_key(peer) {
|
||||
dead_peers.push(*a);
|
||||
}
|
||||
}
|
||||
if expired_paths.is_empty() {
|
||||
true
|
||||
} else {
|
||||
let mut pbs_w = pbs.write();
|
||||
pbs_w.retain(|_, path| !expired_paths.contains(&Arc::as_ptr(path)));
|
||||
!pbs_w.is_empty()
|
||||
}
|
||||
})
|
||||
}
|
||||
for dp in dead_peers.iter() {
|
||||
self.peers.write().remove(dp);
|
||||
}
|
||||
}
|
||||
|
||||
if intervals.whois.gate(tt) {
|
||||
if path_check {
|
||||
// Service all paths, removing expired or invalid ones. This is done in two passes to
|
||||
// avoid introducing latency into a flow.
|
||||
let mut dead_paths = Vec::new();
|
||||
let mut need_keepalive = Vec::new();
|
||||
for (k, path) in self.paths.read().iter() {
|
||||
if si.local_socket_is_valid(k.local_socket()) {
|
||||
match path.service(tt) {
|
||||
PathServiceResult::Ok => {}
|
||||
PathServiceResult::Dead => dead_paths.push(k.to_copied()),
|
||||
PathServiceResult::NeedsKeepalive => need_keepalive.push(path.clone()),
|
||||
}
|
||||
} else {
|
||||
dead_paths.push(k.to_copied());
|
||||
}
|
||||
}
|
||||
for dp in dead_paths.iter() {
|
||||
self.paths.write().remove(dp);
|
||||
}
|
||||
let z = [&crate::util::ZEROES[..1]];
|
||||
for ka in need_keepalive.iter() {
|
||||
si.wire_send(&ka.endpoint, Some(&ka.local_socket), Some(&ka.local_interface), &z, 0).await;
|
||||
}
|
||||
}
|
||||
|
||||
if whois_check {
|
||||
self.whois.service(si, self, tt);
|
||||
}
|
||||
|
||||
Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2)
|
||||
}
|
||||
|
||||
pub fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) {
|
||||
pub async fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) {
|
||||
debug_event!(
|
||||
si,
|
||||
"<< #{} ->{} from {} length {} via socket {}@{}",
|
||||
|
@ -359,12 +460,10 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
if let Some(frag0) = assembled_packet.frags[0].as_ref() {
|
||||
debug_event!(si, "-- #{:0>16x} packet fully assembled!", u64::from_be_bytes(fragment_header.id));
|
||||
|
||||
let packet_header = frag0.struct_at::<PacketHeader>(0);
|
||||
if packet_header.is_ok() {
|
||||
let packet_header = packet_header.unwrap();
|
||||
if let Ok(packet_header) = frag0.struct_at::<PacketHeader>(0) {
|
||||
if let Some(source) = Address::from_bytes(&packet_header.src) {
|
||||
if let Some(peer) = self.peer(source) {
|
||||
peer.receive(self, si, ph, time_ticks, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]);
|
||||
peer.receive(self, si, ph, time_ticks, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]).await;
|
||||
} else {
|
||||
self.whois.query(self, si, source, Some(QueuedPacket::Fragmented(assembled_packet)));
|
||||
}
|
||||
|
@ -378,7 +477,7 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
|
||||
if let Some(source) = Address::from_bytes(&packet_header.src) {
|
||||
if let Some(peer) = self.peer(source) {
|
||||
peer.receive(self, si, ph, time_ticks, &path, &packet_header, data.as_ref(), &[]);
|
||||
peer.receive(self, si, ph, time_ticks, &path, &packet_header, data.as_ref(), &[]).await;
|
||||
} else {
|
||||
self.whois.query(self, si, source, Some(QueuedPacket::Unfragmented(data)));
|
||||
}
|
||||
|
@ -406,7 +505,7 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
|
||||
if let Some(peer) = self.peer(dest) {
|
||||
// TODO: SHOULD we forward? Need a way to check.
|
||||
peer.forward(si, time_ticks, data.as_ref());
|
||||
peer.forward(si, time_ticks, data.as_ref()).await;
|
||||
debug_event!(si, "-- #{:0>16x} forwarded successfully", u64::from_be_bytes(fragment_header.id));
|
||||
}
|
||||
}
|
||||
|
@ -419,11 +518,11 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
}
|
||||
|
||||
pub fn is_peer_root(&self, peer: &Peer<SI>) -> bool {
|
||||
self.roots.lock().roots.contains_key(peer)
|
||||
self.roots.read().roots.contains_key(peer)
|
||||
}
|
||||
|
||||
pub fn add_update_root_set(&self, rs: RootSet) -> bool {
|
||||
let mut roots = self.roots.lock();
|
||||
let mut roots = self.roots.write();
|
||||
if let Some(entry) = roots.sets.get_mut(&rs.name) {
|
||||
if rs.should_replace(entry) {
|
||||
*entry = rs;
|
||||
|
@ -439,30 +538,18 @@ impl<SI: SystemInterface> Node<SI> {
|
|||
}
|
||||
|
||||
pub fn has_roots_defined(&self) -> bool {
|
||||
self.roots.lock().sets.iter().any(|rs| !rs.1.members.is_empty())
|
||||
self.roots.read().sets.iter().any(|rs| !rs.1.members.is_empty())
|
||||
}
|
||||
|
||||
pub fn root_sets(&self) -> Vec<RootSet> {
|
||||
self.roots.lock().sets.values().cloned().collect()
|
||||
self.roots.read().sets.values().cloned().collect()
|
||||
}
|
||||
|
||||
pub fn canonical_path(&self, ep: &Endpoint, local_socket: &SI::LocalSocket, local_interface: &SI::LocalInterface, time_ticks: i64) -> Arc<Path<SI>> {
|
||||
// It's faster to do a read only lookup first since most of the time this will succeed. The second
|
||||
// version below this only gets invoked if it's a new path.
|
||||
if let Some(path) = self.paths.get(ep) {
|
||||
if let Some(path) = path.value().read().get(local_socket) {
|
||||
return path.clone();
|
||||
}
|
||||
if let Some(path) = self.paths.read().get(&PathKey::Ref(ep, local_socket)) {
|
||||
path.clone()
|
||||
} else {
|
||||
self.paths.write().entry(PathKey::Copied(ep.clone(), local_socket.clone())).or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks))).clone()
|
||||
}
|
||||
|
||||
return self
|
||||
.paths
|
||||
.entry(ep.clone())
|
||||
.or_insert_with(|| parking_lot::RwLock::new(HashMap::with_capacity(4)))
|
||||
.value_mut()
|
||||
.write()
|
||||
.entry(local_socket.clone())
|
||||
.or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks)))
|
||||
.clone();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,11 +2,11 @@
|
|||
|
||||
use std::collections::HashMap;
|
||||
use std::hash::{BuildHasher, Hasher};
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::util::*;
|
||||
use crate::vl1::endpoint::Endpoint;
|
||||
use crate::vl1::fragmentedpacket::FragmentedPacket;
|
||||
use crate::vl1::node::*;
|
||||
|
@ -14,6 +14,16 @@ use crate::vl1::protocol::*;
|
|||
|
||||
pub(crate) const SERVICE_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL;
|
||||
|
||||
pub(crate) enum PathServiceResult {
|
||||
Ok,
|
||||
Dead,
|
||||
NeedsKeepalive,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref INSTANCE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
|
||||
}
|
||||
|
||||
/// A remote endpoint paired with a local socket and a local interface.
|
||||
/// These are maintained in Node and canonicalized so that all unique paths have
|
||||
/// one and only one unique path object. That enables statistics to be tracked
|
||||
|
@ -22,6 +32,7 @@ pub struct Path<SI: SystemInterface> {
|
|||
pub endpoint: Endpoint,
|
||||
pub local_socket: SI::LocalSocket,
|
||||
pub local_interface: SI::LocalInterface,
|
||||
pub(crate) internal_instance_id: usize, // arbitrary local ID that should be globally unique to a given path object instance
|
||||
last_send_time_ticks: AtomicI64,
|
||||
last_receive_time_ticks: AtomicI64,
|
||||
create_time_ticks: i64,
|
||||
|
@ -34,6 +45,7 @@ impl<SI: SystemInterface> Path<SI> {
|
|||
endpoint,
|
||||
local_socket,
|
||||
local_interface,
|
||||
internal_instance_id: INSTANCE_ID_COUNTER.fetch_add(1, Ordering::SeqCst),
|
||||
last_send_time_ticks: AtomicI64::new(0),
|
||||
last_receive_time_ticks: AtomicI64::new(0),
|
||||
create_time_ticks: time_ticks,
|
||||
|
@ -78,18 +90,19 @@ impl<SI: SystemInterface> Path<SI> {
|
|||
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn service(&self, si: &SI, _: &Node<SI>, time_ticks: i64) -> bool {
|
||||
pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult {
|
||||
self.fragmented_packets.lock().retain(|_, frag| (time_ticks - frag.ts_ticks) < packet_constants::FRAGMENT_EXPIRATION);
|
||||
if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PATH_EXPIRATION_TIME {
|
||||
if (time_ticks - self.last_send_time_ticks.load(Ordering::Relaxed)) >= PATH_KEEPALIVE_INTERVAL {
|
||||
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||
si.wire_send(&self.endpoint, Some(&self.local_socket), Some(&self.local_interface), &[&ZEROES[..1]], 0);
|
||||
PathServiceResult::NeedsKeepalive
|
||||
} else {
|
||||
PathServiceResult::Ok
|
||||
}
|
||||
true
|
||||
} else if (time_ticks - self.create_time_ticks) < PATH_EXPIRATION_TIME {
|
||||
true
|
||||
PathServiceResult::Ok
|
||||
} else {
|
||||
false
|
||||
PathServiceResult::Dead
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ use zerotier_core_crypto::salsa::Salsa;
|
|||
use zerotier_core_crypto::secret::Secret;
|
||||
|
||||
use crate::util::byte_array_range;
|
||||
use crate::util::debug_event;
|
||||
use crate::util::marshalable::Marshalable;
|
||||
use crate::vl1::node::*;
|
||||
use crate::vl1::protocol::*;
|
||||
|
@ -26,6 +27,7 @@ pub(crate) const SERVICE_INTERVAL_MS: i64 = security_constants::EPHEMERAL_SECRET
|
|||
|
||||
struct PeerPath<SI: SystemInterface> {
|
||||
path: Weak<Path<SI>>,
|
||||
path_internal_instance_id: usize,
|
||||
last_receive_time_ticks: i64,
|
||||
}
|
||||
|
||||
|
@ -205,7 +207,7 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
///
|
||||
/// If the packet comes in multiple fragments, the fragments slice should contain all
|
||||
/// those fragments after the main packet header and first chunk.
|
||||
pub(crate) fn receive<VI: InnerProtocolInterface>(&self, node: &Node<SI>, si: &SI, vi: &VI, time_ticks: i64, source_path: &Arc<Path<SI>>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option<PooledPacketBuffer>]) {
|
||||
pub(crate) async fn receive<PH: InnerProtocolInterface>(&self, node: &Node<SI>, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc<Path<SI>>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option<PooledPacketBuffer>]) {
|
||||
if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) {
|
||||
let mut payload = unsafe { PacketBuffer::new_without_memzero() };
|
||||
|
||||
|
@ -268,9 +270,8 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
}
|
||||
}
|
||||
|
||||
let source_path_ptr = Arc::as_ptr(source_path);
|
||||
for p in self.paths.lock().iter_mut() {
|
||||
if Weak::as_ptr(&p.path) == source_path_ptr {
|
||||
if p.path_internal_instance_id == source_path.internal_instance_id {
|
||||
p.last_receive_time_ticks = time_ticks;
|
||||
break;
|
||||
}
|
||||
|
@ -281,17 +282,17 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
// because the most performance critical path is the handling of the ???_FRAME
|
||||
// verbs, which are in VL2.
|
||||
verb &= packet_constants::VERB_MASK; // mask off flags
|
||||
if !vi.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) {
|
||||
if !ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await {
|
||||
match verb {
|
||||
//VERB_VL1_NOP => {}
|
||||
verbs::VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload),
|
||||
verbs::VL1_ERROR => self.receive_error(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload),
|
||||
verbs::VL1_OK => self.receive_ok(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload),
|
||||
verbs::VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload),
|
||||
verbs::VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload),
|
||||
verbs::VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload),
|
||||
verbs::VL1_PUSH_DIRECT_PATHS => self.receive_push_direct_paths(si, node, time_ticks, source_path, &payload),
|
||||
verbs::VL1_USER_MESSAGE => self.receive_user_message(si, node, time_ticks, source_path, &payload),
|
||||
verbs::VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload).await,
|
||||
verbs::VL1_ERROR => self.receive_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
|
||||
verbs::VL1_OK => self.receive_ok(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
|
||||
verbs::VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload).await,
|
||||
verbs::VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload).await,
|
||||
verbs::VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload).await,
|
||||
verbs::VL1_PUSH_DIRECT_PATHS => self.receive_push_direct_paths(si, node, time_ticks, source_path, &payload).await,
|
||||
verbs::VL1_USER_MESSAGE => self.receive_user_message(si, node, time_ticks, source_path, &payload).await,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
@ -299,13 +300,13 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
}
|
||||
}
|
||||
|
||||
fn send_to_endpoint(&self, si: &SI, endpoint: &Endpoint, local_socket: Option<&SI::LocalSocket>, local_interface: Option<&SI::LocalInterface>, packet: &PacketBuffer) -> bool {
|
||||
async fn send_to_endpoint(&self, si: &SI, endpoint: &Endpoint, local_socket: Option<&SI::LocalSocket>, local_interface: Option<&SI::LocalInterface>, packet: &PacketBuffer) -> bool {
|
||||
match endpoint {
|
||||
Endpoint::Ip(_) | Endpoint::IpUdp(_) | Endpoint::Ethernet(_) | Endpoint::Bluetooth(_) | Endpoint::WifiDirect(_) => {
|
||||
let packet_size = packet.len();
|
||||
if packet_size > UDP_DEFAULT_MTU {
|
||||
let bytes = packet.as_bytes();
|
||||
if !si.wire_send(endpoint, local_socket, local_interface, &[&bytes[0..UDP_DEFAULT_MTU]], 0) {
|
||||
if !si.wire_send(endpoint, local_socket, local_interface, &[&bytes[0..UDP_DEFAULT_MTU]], 0).await {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -327,7 +328,7 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
loop {
|
||||
header.total_and_fragment_no += 1;
|
||||
let next_pos = pos + chunk_size;
|
||||
if !si.wire_send(endpoint, local_socket, local_interface, &[header.as_bytes(), &bytes[pos..next_pos]], 0) {
|
||||
if !si.wire_send(endpoint, local_socket, local_interface, &[header.as_bytes(), &bytes[pos..next_pos]], 0).await {
|
||||
return false;
|
||||
}
|
||||
pos = next_pos;
|
||||
|
@ -338,11 +339,11 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0);
|
||||
return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0);
|
||||
return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -351,9 +352,9 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
///
|
||||
/// This will go directly if there is an active path, or otherwise indirectly
|
||||
/// via a root or some other route.
|
||||
pub(crate) fn send(&self, si: &SI, node: &Node<SI>, time_ticks: i64, packet: &PacketBuffer) -> bool {
|
||||
pub(crate) async fn send(&self, si: &SI, node: &Node<SI>, time_ticks: i64, packet: &PacketBuffer) -> bool {
|
||||
if let Some(path) = self.path(node) {
|
||||
if self.send_to_endpoint(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), packet) {
|
||||
if self.send_to_endpoint(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), packet).await {
|
||||
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||
return true;
|
||||
}
|
||||
|
@ -368,9 +369,9 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
///
|
||||
/// This doesn't fragment large packets since fragments are forwarded individually.
|
||||
/// Intermediates don't need to adjust fragmentation.
|
||||
pub(crate) fn forward(&self, si: &SI, time_ticks: i64, packet: &PacketBuffer) -> bool {
|
||||
pub(crate) async fn forward(&self, si: &SI, time_ticks: i64, packet: &PacketBuffer) -> bool {
|
||||
if let Some(path) = self.direct_path() {
|
||||
if si.wire_send(&path.endpoint, Some(&path.local_socket), Some(&path.local_interface), &[packet.as_bytes()], 0) {
|
||||
if si.wire_send(&path.endpoint, Some(&path.local_socket), Some(&path.local_interface), &[packet.as_bytes()], 0).await {
|
||||
self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||
return true;
|
||||
}
|
||||
|
@ -385,110 +386,109 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
///
|
||||
/// Unlike other messages HELLO is sent partially in the clear and always with the long-lived
|
||||
/// static identity key.
|
||||
pub(crate) fn send_hello(&self, si: &SI, node: &Node<SI>, explicit_endpoint: Option<&Endpoint>) -> bool {
|
||||
pub(crate) async fn send_hello(&self, si: &SI, node: &Node<SI>, explicit_endpoint: Option<&Endpoint>) -> bool {
|
||||
let mut path = None;
|
||||
let destination = explicit_endpoint.map_or_else(
|
||||
|| {
|
||||
self.path(node).map_or(None, |p| {
|
||||
let _ = path.insert(p.clone());
|
||||
Some(p.endpoint.clone())
|
||||
})
|
||||
},
|
||||
|endpoint| Some(endpoint.clone()),
|
||||
);
|
||||
if destination.is_none() {
|
||||
return false;
|
||||
}
|
||||
let destination = destination.unwrap();
|
||||
let destination = if let Some(explicit_endpoint) = explicit_endpoint {
|
||||
explicit_endpoint.clone()
|
||||
} else {
|
||||
if let Some(p) = self.path(node) {
|
||||
let _ = path.insert(p);
|
||||
path.as_ref().unwrap().endpoint.clone()
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let mut packet = PacketBuffer::new();
|
||||
let time_ticks = si.time_ticks();
|
||||
let message_id = self.next_message_id();
|
||||
|
||||
let mut packet = PacketBuffer::new();
|
||||
{
|
||||
let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap();
|
||||
packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used
|
||||
packet_header.dest = self.identity.address.to_bytes();
|
||||
packet_header.src = node.identity.address.to_bytes();
|
||||
packet_header.flags_cipher_hops = security_constants::CIPHER_NOCRYPT_POLY1305;
|
||||
let message_id = self.next_message_id();
|
||||
|
||||
{
|
||||
let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap();
|
||||
packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used
|
||||
packet_header.dest = self.identity.address.to_bytes();
|
||||
packet_header.src = node.identity.address.to_bytes();
|
||||
packet_header.flags_cipher_hops = security_constants::CIPHER_NOCRYPT_POLY1305;
|
||||
}
|
||||
|
||||
{
|
||||
let hello_fixed_headers: &mut message_component_structs::HelloFixedHeaderFields = packet.append_struct_get_mut().unwrap();
|
||||
hello_fixed_headers.verb = verbs::VL1_HELLO | packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION;
|
||||
hello_fixed_headers.version_proto = PROTOCOL_VERSION;
|
||||
hello_fixed_headers.version_major = VERSION_MAJOR;
|
||||
hello_fixed_headers.version_minor = VERSION_MINOR;
|
||||
hello_fixed_headers.version_revision = (VERSION_REVISION as u16).to_be_bytes();
|
||||
hello_fixed_headers.timestamp = (time_ticks as u64).to_be_bytes();
|
||||
}
|
||||
|
||||
// Full identity of the node establishing the session.
|
||||
assert!(self.identity.marshal_with_options(&mut packet, Identity::ALGORITHM_ALL, false).is_ok());
|
||||
|
||||
// 8 reserved bytes, must be zero for compatibility with old nodes.
|
||||
assert!(packet.append_padding(0, 8).is_ok());
|
||||
|
||||
// Generate a 12-byte nonce for the private section of HELLO.
|
||||
let mut nonce = get_bytes_secure::<12>();
|
||||
|
||||
// LEGACY: create a 16-bit encrypted field that specifies zero "moons." Current nodes ignore this
|
||||
// and treat it as part of the random AES-CTR nonce, but old versions need it to parse the packet
|
||||
// correctly.
|
||||
let mut salsa_iv = message_id.to_ne_bytes();
|
||||
salsa_iv[7] &= 0xf8;
|
||||
Salsa::<12>::new(&self.identity_symmetric_key.key.0[0..32], &salsa_iv).crypt(&[0_u8, 0_u8], &mut nonce[8..10]);
|
||||
|
||||
// Append 12-byte AES-CTR nonce.
|
||||
assert!(packet.append_bytes_fixed(&nonce).is_ok());
|
||||
|
||||
// Add session meta-data, which is encrypted using plain AES-CTR. No authentication (AEAD) is needed
|
||||
// because the whole packet is authenticated. Data in the session is not technically secret in a
|
||||
// cryptographic sense but we encrypt it for privacy and as a defense in depth.
|
||||
let mut fields = Dictionary::new();
|
||||
fields.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
|
||||
fields.set_u64(session_metadata::CLOCK, si.time_clock() as u64);
|
||||
fields.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
|
||||
let fields = fields.to_bytes();
|
||||
assert!(fields.len() <= 0xffff); // sanity check, should be impossible
|
||||
assert!(packet.append_u16(fields.len() as u16).is_ok()); // prefix with unencrypted size
|
||||
let private_section_start = packet.len();
|
||||
assert!(packet.append_bytes(fields.as_slice()).is_ok());
|
||||
let mut aes = AesCtr::new(&self.identity_symmetric_key.hello_private_section_key.as_bytes()[0..32]);
|
||||
aes.init(&nonce);
|
||||
aes.crypt_in_place(&mut packet.as_mut()[private_section_start..]);
|
||||
|
||||
// Seal packet with HMAC-SHA512 extended authentication.
|
||||
let mut hmac = HMACSHA512::new(self.identity_symmetric_key.packet_hmac_key.as_bytes());
|
||||
hmac.update(&message_id.to_ne_bytes());
|
||||
hmac.update(&packet.as_bytes()[packet_constants::HEADER_SIZE..]);
|
||||
assert!(packet.append_bytes_fixed(&hmac.finish()).is_ok());
|
||||
drop(hmac);
|
||||
|
||||
// Set poly1305 in header, which is the only authentication for old nodes.
|
||||
let (_, mut poly) = salsa_poly_create(&self.identity_symmetric_key, packet.struct_at::<PacketHeader>(0).unwrap(), packet.len());
|
||||
poly.update(packet.as_bytes_starting_at(packet_constants::HEADER_SIZE).unwrap());
|
||||
packet.as_mut()[packet_constants::MAC_FIELD_INDEX..packet_constants::MAC_FIELD_INDEX + 8].copy_from_slice(&poly.finish()[0..8]);
|
||||
|
||||
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||
|
||||
debug_event!(si, "HELLO -> {} @ {} ({} bytes)", self.identity.address.to_string(), destination.to_string(), packet.len());
|
||||
}
|
||||
|
||||
{
|
||||
let hello_fixed_headers: &mut message_component_structs::HelloFixedHeaderFields = packet.append_struct_get_mut().unwrap();
|
||||
hello_fixed_headers.verb = verbs::VL1_HELLO | packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION;
|
||||
hello_fixed_headers.version_proto = PROTOCOL_VERSION;
|
||||
hello_fixed_headers.version_major = VERSION_MAJOR;
|
||||
hello_fixed_headers.version_minor = VERSION_MINOR;
|
||||
hello_fixed_headers.version_revision = (VERSION_REVISION as u16).to_be_bytes();
|
||||
hello_fixed_headers.timestamp = (time_ticks as u64).to_be_bytes();
|
||||
if let Some(p) = path {
|
||||
if self.send_to_endpoint(si, &destination, Some(&p.local_socket), Some(&p.local_interface), &packet).await {
|
||||
p.log_send_anything(time_ticks);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
self.send_to_endpoint(si, &destination, None, None, &packet).await
|
||||
}
|
||||
|
||||
// Full identity of the node establishing the session.
|
||||
assert!(self.identity.marshal_with_options(&mut packet, Identity::ALGORITHM_ALL, false).is_ok());
|
||||
|
||||
// 8 reserved bytes, must be zero for compatibility with old nodes.
|
||||
assert!(packet.append_padding(0, 8).is_ok());
|
||||
|
||||
// Generate a 12-byte nonce for the private section of HELLO.
|
||||
let mut nonce = get_bytes_secure::<12>();
|
||||
|
||||
// LEGACY: create a 16-bit encrypted field that specifies zero "moons." Current nodes ignore this
|
||||
// and treat it as part of the random AES-CTR nonce, but old versions need it to parse the packet
|
||||
// correctly.
|
||||
let mut salsa_iv = message_id.to_ne_bytes();
|
||||
salsa_iv[7] &= 0xf8;
|
||||
Salsa::<12>::new(&self.identity_symmetric_key.key.0[0..32], &salsa_iv).crypt(&[0_u8, 0_u8], &mut nonce[8..10]);
|
||||
|
||||
// Append 12-byte AES-CTR nonce.
|
||||
assert!(packet.append_bytes_fixed(&nonce).is_ok());
|
||||
|
||||
// Add session meta-data, which is encrypted using plain AES-CTR. No authentication (AEAD) is needed
|
||||
// because the whole packet is authenticated. Data in the session is not technically secret in a
|
||||
// cryptographic sense but we encrypt it for privacy and as a defense in depth.
|
||||
let mut fields = Dictionary::new();
|
||||
fields.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
|
||||
fields.set_u64(session_metadata::CLOCK, si.time_clock() as u64);
|
||||
fields.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
|
||||
let fields = fields.to_bytes();
|
||||
assert!(fields.len() <= 0xffff); // sanity check, should be impossible
|
||||
assert!(packet.append_u16(fields.len() as u16).is_ok()); // prefix with unencrypted size
|
||||
let private_section_start = packet.len();
|
||||
assert!(packet.append_bytes(fields.as_slice()).is_ok());
|
||||
let mut aes = AesCtr::new(&self.identity_symmetric_key.hello_private_section_key.as_bytes()[0..32]);
|
||||
aes.init(&nonce);
|
||||
aes.crypt_in_place(&mut packet.as_mut()[private_section_start..]);
|
||||
drop(aes);
|
||||
drop(fields);
|
||||
|
||||
// Seal packet with HMAC-SHA512 extended authentication.
|
||||
let mut hmac = HMACSHA512::new(self.identity_symmetric_key.packet_hmac_key.as_bytes());
|
||||
hmac.update(&message_id.to_ne_bytes());
|
||||
hmac.update(&packet.as_bytes()[packet_constants::HEADER_SIZE..]);
|
||||
assert!(packet.append_bytes_fixed(&hmac.finish()).is_ok());
|
||||
|
||||
// Set poly1305 in header, which is the only authentication for old nodes.
|
||||
let (_, mut poly) = salsa_poly_create(&self.identity_symmetric_key, packet.struct_at::<PacketHeader>(0).unwrap(), packet.len());
|
||||
poly.update(packet.as_bytes_starting_at(packet_constants::HEADER_SIZE).unwrap());
|
||||
packet.as_mut()[packet_constants::MAC_FIELD_INDEX..packet_constants::MAC_FIELD_INDEX + 8].copy_from_slice(&poly.finish()[0..8]);
|
||||
|
||||
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||
|
||||
path.map_or_else(
|
||||
|| self.send_to_endpoint(si, &destination, None, None, &packet),
|
||||
|p| {
|
||||
if self.send_to_endpoint(si, &destination, Some(&p.local_socket), Some(&p.local_interface), &packet) {
|
||||
p.log_send_anything(time_ticks);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn receive_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
async fn receive_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
|
||||
fn receive_error<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
|
||||
async fn receive_error<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
|
||||
let mut cursor: usize = 1;
|
||||
if let Ok(error_header) = payload.read_struct::<message_component_structs::ErrorHeader>(&mut cursor) {
|
||||
let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id);
|
||||
|
@ -496,14 +496,14 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
if current_packet_id_counter.wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX {
|
||||
match error_header.in_re_verb {
|
||||
_ => {
|
||||
ph.handle_error(self, source_path, forward_secrecy, extended_authentication, error_header.in_re_verb, in_re_message_id, error_header.error_code, payload, &mut cursor);
|
||||
ph.handle_error(self, &source_path, forward_secrecy, extended_authentication, error_header.in_re_verb, in_re_message_id, error_header.error_code, payload, &mut cursor).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn receive_ok<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
|
||||
async fn receive_ok<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
|
||||
let mut cursor: usize = 1;
|
||||
if let Ok(ok_header) = payload.read_struct::<message_component_structs::OkHeader>(&mut cursor) {
|
||||
let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id);
|
||||
|
@ -515,22 +515,22 @@ impl<SI: SystemInterface> Peer<SI> {
|
|||
}
|
||||
verbs::VL1_WHOIS => {}
|
||||
_ => {
|
||||
ph.handle_ok(self, source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor);
|
||||
ph.handle_ok(self, &source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn receive_whois(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
async fn receive_whois(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
|
||||
fn receive_rendezvous(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
async fn receive_rendezvous(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
|
||||
fn receive_echo(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
async fn receive_echo(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
|
||||
fn receive_push_direct_paths(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
async fn receive_push_direct_paths(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
|
||||
fn receive_user_message(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
async fn receive_user_message(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
|
||||
|
||||
/// Get current best path or None if there are no direct paths to this peer.
|
||||
pub fn direct_path(&self) -> Option<Arc<Path<SI>>> {
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::vl1::node::{InnerProtocolInterface, SystemInterface};
|
||||
use crate::vl1::protocol::*;
|
||||
use crate::vl1::{Identity, Path, Peer};
|
||||
|
@ -8,16 +10,28 @@ pub trait SwitchInterface: Sync + Send {}
|
|||
|
||||
pub struct Switch {}
|
||||
|
||||
#[async_trait]
|
||||
impl InnerProtocolInterface for Switch {
|
||||
fn handle_packet<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool {
|
||||
async fn handle_packet<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn handle_error<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, error_code: u8, payload: &PacketBuffer, cursor: &mut usize) -> bool {
|
||||
async fn handle_error<SI: SystemInterface>(
|
||||
&self,
|
||||
peer: &Peer<SI>,
|
||||
source_path: &Path<SI>,
|
||||
forward_secrecy: bool,
|
||||
extended_authentication: bool,
|
||||
in_re_verb: u8,
|
||||
in_re_message_id: u64,
|
||||
error_code: u8,
|
||||
payload: &PacketBuffer,
|
||||
cursor: &mut usize,
|
||||
) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn handle_ok<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool {
|
||||
async fn handle_ok<SI: SystemInterface>(&self, peer: &Peer<SI>, source_path: &Path<SI>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
|
@ -27,7 +41,7 @@ impl InnerProtocolInterface for Switch {
|
|||
}
|
||||
|
||||
impl Switch {
|
||||
pub fn new() -> Self {
|
||||
pub async fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
|
33
zerotier-system-service/Cargo.lock
generated
33
zerotier-system-service/Cargo.lock
generated
|
@ -18,6 +18,17 @@ dependencies = [
|
|||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.56"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-polyfill"
|
||||
version = "0.1.8"
|
||||
|
@ -173,18 +184,6 @@ dependencies = [
|
|||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown 0.12.1",
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.9.0"
|
||||
|
@ -278,12 +277,6 @@ version = "0.11.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3"
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.7.13"
|
||||
|
@ -313,7 +306,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.11.2",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1048,8 +1041,8 @@ dependencies = [
|
|||
name = "zerotier-network-hypervisor"
|
||||
version = "2.0.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64",
|
||||
"dashmap",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"lz4_flex",
|
||||
|
|
|
@ -141,8 +141,14 @@ async fn async_main(flags: Flags, global_args: Box<ArgMatches>) -> i32 {
|
|||
Some(("leave", cmd_args)) => todo!(),
|
||||
Some(("service", _)) => {
|
||||
drop(global_args); // free unnecessary heap before starting service as we're done with CLI args
|
||||
assert!(service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await.is_ok());
|
||||
exitcode::OK
|
||||
let svc = service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await;
|
||||
if svc.is_ok() {
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
exitcode::OK
|
||||
} else {
|
||||
println!("FATAL: error launching service: {}", svc.err().unwrap().to_string());
|
||||
exitcode::ERR_IOERR
|
||||
}
|
||||
}
|
||||
Some(("identity", cmd_args)) => todo!(),
|
||||
Some(("rootset", cmd_args)) => cli::rootset::cmd(flags, cmd_args).await,
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::path::Path;
|
|||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use zerotier_network_hypervisor::async_trait;
|
||||
use zerotier_network_hypervisor::vl1::*;
|
||||
use zerotier_network_hypervisor::vl2::*;
|
||||
use zerotier_network_hypervisor::*;
|
||||
|
@ -52,6 +53,10 @@ impl Drop for Service {
|
|||
}
|
||||
|
||||
impl Service {
|
||||
/// Start ZeroTier service.
|
||||
///
|
||||
/// This launches a number of background tasks in the async runtime that will run as long as this object exists.
|
||||
/// When this is dropped these tasks are killed.
|
||||
pub async fn new<P: AsRef<Path>>(rt: tokio::runtime::Handle, base_path: P, auto_upgrade_identity: bool) -> Result<Self, Box<dyn Error>> {
|
||||
let mut si = ServiceImpl {
|
||||
rt,
|
||||
|
@ -61,7 +66,7 @@ impl Service {
|
|||
num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(),
|
||||
_core: None,
|
||||
};
|
||||
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity)?);
|
||||
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity).await?);
|
||||
let si = Arc::new(si);
|
||||
|
||||
si._core.as_ref().unwrap().add_update_default_root_set_if_none();
|
||||
|
@ -88,7 +93,7 @@ impl ServiceImpl {
|
|||
if bp.sockets.is_empty() {
|
||||
return Some(errors);
|
||||
}
|
||||
drop(udp_sockets); // release lock
|
||||
drop(udp_sockets);
|
||||
|
||||
for ns in new_sockets.iter() {
|
||||
/*
|
||||
|
@ -98,19 +103,18 @@ impl ServiceImpl {
|
|||
* This makes sure that when one packet is in processing the async runtime is immediately able to
|
||||
* cue up another receiver for this socket.
|
||||
*/
|
||||
let mut socket_associated_tasks = ns.socket_associated_tasks.lock();
|
||||
for _ in 0..self.num_listeners_per_socket {
|
||||
let self2 = self.clone();
|
||||
let socket = ns.socket.clone();
|
||||
let interface = ns.interface.clone();
|
||||
let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst));
|
||||
socket_associated_tasks.push(self.rt.spawn(async move {
|
||||
ns.socket_associated_tasks.lock().push(self.rt.spawn(async move {
|
||||
let core = self2.core();
|
||||
loop {
|
||||
let mut buf = core.get_packet_buffer();
|
||||
if let Ok((bytes, source)) = socket.recv_from(unsafe { buf.entire_buffer_mut() }).await {
|
||||
unsafe { buf.set_size_unchecked(bytes) };
|
||||
core.handle_incoming_physical_packet(&self2, &Endpoint::IpUdp(InetAddress::from(source)), &local_socket, &interface, buf);
|
||||
core.handle_incoming_physical_packet(&self2, &Endpoint::IpUdp(InetAddress::from(source)), &local_socket, &interface, buf).await;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
@ -142,38 +146,39 @@ impl ServiceImpl {
|
|||
async fn core_background_service_task_main(self: Arc<Self>) {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
loop {
|
||||
tokio::time::sleep(self.core().do_background_tasks(&self)).await;
|
||||
tokio::time::sleep(self.core().do_background_tasks(&self).await).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SystemInterface for ServiceImpl {
|
||||
type LocalSocket = crate::service::LocalSocket;
|
||||
type LocalInterface = crate::localinterface::LocalInterface;
|
||||
|
||||
fn event(&self, event: Event) {
|
||||
async fn event(&self, event: Event) {
|
||||
println!("{}", event.to_string());
|
||||
match event {
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {}
|
||||
async fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {}
|
||||
|
||||
#[inline(always)]
|
||||
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool {
|
||||
socket.0.strong_count() > 0
|
||||
}
|
||||
|
||||
fn load_node_identity(&self) -> Option<Identity> {
|
||||
self.rt.block_on(async { self.data.load_identity().await.map_or(None, |i| Some(i)) })
|
||||
async fn load_node_identity(&self) -> Option<Identity> {
|
||||
self.data.load_identity().await.map_or(None, |i| Some(i))
|
||||
}
|
||||
|
||||
fn save_node_identity(&self, id: &Identity) {
|
||||
self.rt.block_on(async { assert!(self.data.save_identity(id).await.is_ok()) });
|
||||
async fn save_node_identity(&self, id: &Identity) {
|
||||
assert!(self.data.save_identity(id).await.is_ok())
|
||||
}
|
||||
|
||||
fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool {
|
||||
async fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool {
|
||||
match endpoint {
|
||||
Endpoint::IpUdp(address) => {
|
||||
// This is the fast path -- the socket is known to the core so just send it.
|
||||
|
@ -187,64 +192,58 @@ impl SystemInterface for ServiceImpl {
|
|||
|
||||
// Otherwise we try to send from one socket on every interface or from the specified interface.
|
||||
// This path only happens when the core is trying new endpoints. The fast path is for most packets.
|
||||
return self.rt.block_on(async {
|
||||
let sockets = self.udp_sockets.read().await;
|
||||
if !sockets.is_empty() {
|
||||
if let Some(specific_interface) = local_interface {
|
||||
for (_, p) in sockets.iter() {
|
||||
for s in p.sockets.iter() {
|
||||
if s.interface.eq(specific_interface) {
|
||||
if s.send_async(&self.rt, address, data, packet_ttl).await {
|
||||
return true;
|
||||
}
|
||||
let sockets = self.udp_sockets.read().await;
|
||||
if !sockets.is_empty() {
|
||||
if let Some(specific_interface) = local_interface {
|
||||
for (_, p) in sockets.iter() {
|
||||
for s in p.sockets.iter() {
|
||||
if s.interface.eq(specific_interface) {
|
||||
if s.send_async(&self.rt, address, data, packet_ttl).await {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let bound_ports: Vec<&u16> = sockets.keys().collect();
|
||||
let mut sent_on_interfaces = HashSet::with_capacity(4);
|
||||
let rn = random::xorshift64_random() as usize;
|
||||
for i in 0..bound_ports.len() {
|
||||
let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap();
|
||||
for s in p.sockets.iter() {
|
||||
if !sent_on_interfaces.contains(&s.interface) {
|
||||
if s.send_async(&self.rt, address, data, packet_ttl).await {
|
||||
sent_on_interfaces.insert(s.interface.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return !sent_on_interfaces.is_empty();
|
||||
}
|
||||
} else {
|
||||
let bound_ports: Vec<&u16> = sockets.keys().collect();
|
||||
let mut sent_on_interfaces = HashSet::with_capacity(4);
|
||||
let rn = random::xorshift64_random() as usize;
|
||||
for i in 0..bound_ports.len() {
|
||||
let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap();
|
||||
for s in p.sockets.iter() {
|
||||
if !sent_on_interfaces.contains(&s.interface) {
|
||||
if s.send_async(&self.rt, address, data, packet_ttl).await {
|
||||
sent_on_interfaces.insert(s.interface.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return !sent_on_interfaces.is_empty();
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
return false;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
fn check_path(&self, _id: &Identity, endpoint: &Endpoint, _local_socket: Option<&Self::LocalSocket>, _local_interface: Option<&Self::LocalInterface>) -> bool {
|
||||
self.rt.block_on(async {
|
||||
let config = self.data.config().await;
|
||||
if let Some(pps) = config.physical.get(endpoint) {
|
||||
!pps.blacklist
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
async fn check_path(&self, _id: &Identity, endpoint: &Endpoint, _local_socket: Option<&Self::LocalSocket>, _local_interface: Option<&Self::LocalInterface>) -> bool {
|
||||
let config = self.data.config().await;
|
||||
if let Some(pps) = config.physical.get(endpoint) {
|
||||
!pps.blacklist
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>> {
|
||||
self.rt.block_on(async {
|
||||
let config = self.data.config().await;
|
||||
if let Some(vns) = config.virtual_.get(&id.address) {
|
||||
Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
async fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>> {
|
||||
let config = self.data.config().await;
|
||||
if let Some(vns) = config.virtual_.get(&id.address) {
|
||||
Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
|
Loading…
Add table
Reference in a new issue