Renaming...

This commit is contained in:
Adam Ierymenko 2022-12-21 20:00:15 -05:00
parent e3e4337d2f
commit 234e0ea0ee
13 changed files with 266 additions and 302 deletions

View file

@ -371,13 +371,10 @@ impl Controller {
// Make sure these agree. It should be impossible to end up with a member that's authorized and
// whose identity and identity fingerprint don't match.
if !secure_eq(&member
.identity
.as_ref()
.unwrap()
.fingerprint,
member.identity_fingerprint.as_ref().unwrap().as_bytes())
{
if !secure_eq(
&member.identity.as_ref().unwrap().fingerprint,
member.identity_fingerprint.as_ref().unwrap().as_bytes(),
) {
debug_assert!(false);
return Ok((AuthenticationResult::RejectedDueToError, None));
}
@ -501,7 +498,7 @@ impl Controller {
}
impl InnerProtocol for Controller {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
fn handle_packet<HostSystemImpl: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
_: &Node,
@ -641,7 +638,7 @@ impl InnerProtocol for Controller {
}
}
impl VL1AuthProvider for Controller {
impl PeerFilter for Controller {
#[inline(always)]
fn should_respond_to(&self, _: &Verified<Identity>) -> bool {
// Controllers always have to establish sessions to process requests. We don't really know if

View file

@ -1,7 +1,7 @@
use async_trait::async_trait;
use zerotier_crypto::secure_eq;
use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorage};
use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorageProvider};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::tokio::sync::broadcast::Receiver;
@ -22,7 +22,7 @@ pub enum Change {
}
#[async_trait]
pub trait Database: Sync + Send + NodeStorage + 'static {
pub trait Database: Sync + Send + NodeStorageProvider + 'static {
async fn list_networks(&self) -> Result<Vec<NetworkId>, Error>;
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Error>;
async fn save_network(&self, obj: Network, generate_change_notification: bool) -> Result<(), Error>;

View file

@ -7,7 +7,7 @@ use async_trait::async_trait;
use notify::{RecursiveMode, Watcher};
use serde::de::DeserializeOwned;
use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage, Verified};
use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorageProvider, Verified};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::io::{fs_restrict_permissions, read_limit};
use zerotier_utils::reaper::Reaper;
@ -274,7 +274,7 @@ impl Drop for FileDatabase {
}
}
impl NodeStorage for FileDatabase {
impl NodeStorageProvider for FileDatabase {
fn load_node_identity(&self) -> Option<Verified<Identity>> {
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384);
if id_data.is_err() {

View file

@ -12,7 +12,7 @@ use tokio_postgres::{Client, Statement};
use zerotier_crypto::secure_eq;
use zerotier_crypto::verified::Verified;
use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress, NodeStorage};
use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress, NodeStorageProvider};
use zerotier_network_hypervisor::vl2::networkconfig::IpRoute;
use zerotier_network_hypervisor::vl2::rule::Rule;
use zerotier_network_hypervisor::vl2::NetworkId;
@ -187,7 +187,7 @@ impl PostgresDatabase {
}
}
impl NodeStorage for PostgresDatabase {
impl NodeStorageProvider for PostgresDatabase {
fn load_node_identity(&self) -> Option<Verified<Identity>> {
Some(self.local_identity.clone())
}

View file

@ -18,7 +18,7 @@ pub use event::Event;
pub use identity::Identity;
pub use inetaddress::InetAddress;
pub use mac::MAC;
pub use node::{DummyInnerProtocol, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, VL1AuthProvider};
pub use node::{ApplicationLayer, DummyInnerLayer, InnerLayer, Node, NodeStorageProvider, PacketHandlerResult, PeerFilter};
pub use path::Path;
pub use peer::Peer;
pub use rootset::{Root, RootSet};

View file

@ -27,11 +27,11 @@ use zerotier_utils::marshalable::Marshalable;
use zerotier_utils::ringbuffer::RingBuffer;
use zerotier_utils::thing::Thing;
/// Trait providing VL1 authentication functions to determine which nodes we should talk to.
/// Trait providing functions to determine what peers we should talk to.
///
/// This is included in HostSystem but is provided as a separate trait to make it easy for
/// implementers of HostSystem to break this out and allow a user to specify it.
pub trait VL1AuthProvider: Sync + Send {
/// This is included in ApplicationLayer but is provided as a separate trait to make it easy for
/// implementers of ApplicationLayer to break this out and allow a user to specify it.
pub trait PeerFilter: Sync + Send {
/// Check if this node should respond to messages from a given peer at all.
///
/// If this returns false, the node simply drops messages on the floor and refuses
@ -48,9 +48,9 @@ pub trait VL1AuthProvider: Sync + Send {
/// Trait to be implemented by outside code to provide object storage to VL1
///
/// This is included in HostSystem but is provided as a separate trait to make it easy for
/// implementers of HostSystem to break this out and allow a user to specify it.
pub trait NodeStorage: Sync + Send {
/// This is included in ApplicationLayer but is provided as a separate trait to make it easy for
/// implementers of ApplicationLayer to break this out and allow a user to specify it.
pub trait NodeStorageProvider: Sync + Send {
/// Load this node's identity from the data store.
fn load_node_identity(&self) -> Option<Verified<Identity>>;
@ -59,9 +59,9 @@ pub trait NodeStorage: Sync + Send {
}
/// Trait implemented by external code to handle events and provide an interface to the system or application.
pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static {
pub trait ApplicationLayer: PeerFilter + NodeStorageProvider + 'static {
/// Type for implementation of NodeStorage.
type Storage: NodeStorage + ?Sized;
type Storage: NodeStorageProvider + ?Sized;
/// Type for local system sockets.
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
@ -110,12 +110,12 @@ pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static {
///
/// The default implementation always returns true.
#[allow(unused_variables)]
fn should_use_physical_path<HostSystemImpl: HostSystem + ?Sized>(
fn should_use_physical_path<Application: ApplicationLayer + ?Sized>(
&self,
id: &Verified<Identity>,
endpoint: &Endpoint,
local_socket: Option<&HostSystemImpl::LocalSocket>,
local_interface: Option<&HostSystemImpl::LocalInterface>,
local_socket: Option<&Application::LocalSocket>,
local_interface: Option<&Application::LocalInterface>,
) -> bool {
true
}
@ -124,16 +124,10 @@ pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static {
///
/// The default implementation always returns None.
#[allow(unused_variables)]
fn get_path_hints<HostSystemImpl: HostSystem + ?Sized>(
fn get_path_hints<Application: ApplicationLayer + ?Sized>(
&self,
id: &Verified<Identity>,
) -> Option<
Vec<(
Endpoint,
Option<HostSystemImpl::LocalSocket>,
Option<HostSystemImpl::LocalInterface>,
)>,
> {
) -> Option<Vec<(Endpoint, Option<Application::LocalSocket>, Option<Application::LocalInterface>)>> {
None
}
@ -163,14 +157,14 @@ pub enum PacketHandlerResult {
/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
#[allow(unused)]
pub trait InnerProtocol: Sync + Send {
pub trait InnerLayer: Sync + Send {
/// Handle a packet, returning true if it was handled by the next layer.
///
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
/// The default version returns NotHandled.
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
fn handle_packet<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
@ -185,9 +179,9 @@ pub trait InnerProtocol: Sync + Send {
/// Handle errors, returning true if the error was recognized.
/// The default version returns NotHandled.
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
fn handle_error<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
@ -204,9 +198,9 @@ pub trait InnerProtocol: Sync + Send {
/// Handle an OK, returning true if the OK was recognized.
/// The default version returns NotHandled.
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
fn handle_ok<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
@ -261,7 +255,7 @@ struct WhoisQueueItem {
}
const PATH_MAP_SIZE: usize = std::mem::size_of::<HashMap<[u8; std::mem::size_of::<Endpoint>() + 128], Arc<Path>>>();
type PathMap<HostSystemImpl> = HashMap<PathKey<'static, 'static, HostSystemImpl>, Arc<Path>>;
type PathMap<Application> = HashMap<PathKey<'static, 'static, Application>, Arc<Path>>;
/// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network.
pub struct Node {
@ -278,7 +272,7 @@ pub struct Node {
intervals: Mutex<BackgroundTaskIntervals>,
/// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use.
paths: RwLock<Thing<PATH_MAP_SIZE>>, // holds a PathMap<> but as a Thing<> to hide HostSystemImpl template parameter
paths: RwLock<Thing<PATH_MAP_SIZE>>, // holds a PathMap<> but as a Thing<> to hide ApplicationLayer template parameter
/// Peers with which we are currently communicating.
peers: RwLock<HashMap<Address, Arc<Peer>>>,
@ -294,20 +288,20 @@ pub struct Node {
}
impl Node {
pub fn new<HostSystemImpl: HostSystem + ?Sized>(
host_system: &HostSystemImpl,
pub fn new<Application: ApplicationLayer + ?Sized>(
app: &Application,
auto_generate_identity: bool,
auto_upgrade_identity: bool,
) -> Result<Self, InvalidParameterError> {
let mut id = {
let id = host_system.storage().load_node_identity();
let id = app.storage().load_node_identity();
if id.is_none() {
if !auto_generate_identity {
return Err(InvalidParameterError("no identity found and auto-generate not enabled"));
} else {
let id = Identity::generate();
host_system.event(Event::IdentityAutoGenerated(id.as_ref().clone()));
host_system.storage().save_node_identity(&id);
app.event(Event::IdentityAutoGenerated(id.as_ref().clone()));
app.storage().save_node_identity(&id);
id
}
} else {
@ -318,18 +312,18 @@ impl Node {
if auto_upgrade_identity {
let old = id.clone();
if id.upgrade()? {
host_system.storage().save_node_identity(&id);
host_system.event(Event::IdentityAutoUpgraded(old.unwrap(), id.as_ref().clone()));
app.storage().save_node_identity(&id);
app.event(Event::IdentityAutoUpgraded(old.unwrap(), id.as_ref().clone()));
}
}
debug_event!(host_system, "[vl1] loaded identity {}", id.to_string());
debug_event!(app, "[vl1] loaded identity {}", id.to_string());
Ok(Self {
instance_id: random::get_bytes_secure(),
identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: RwLock::new(Thing::new(PathMap::<HostSystemImpl>::new())),
paths: RwLock::new(Thing::new(PathMap::<Application>::new())),
peers: RwLock::new(HashMap::new()),
roots: RwLock::new(RootInfo {
sets: HashMap::new(),
@ -403,10 +397,10 @@ impl Node {
self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect()
}
pub fn do_background_tasks<HostSystemImpl: HostSystem + ?Sized>(&self, host_system: &HostSystemImpl) -> Duration {
pub fn do_background_tasks<Application: ApplicationLayer + ?Sized>(&self, app: &Application) -> Duration {
const INTERVAL_MS: i64 = 1000;
const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64);
let time_ticks = host_system.time_ticks();
let time_ticks = app.time_ticks();
let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_queue_retry) = {
let mut intervals = self.intervals.lock().unwrap();
@ -472,7 +466,7 @@ impl Node {
false
}
} {
debug_event!(host_system, "[vl1] root sets modified, synchronizing internal data structures");
debug_event!(app, "[vl1] root sets modified, synchronizing internal data structures");
let (mut old_root_identities, address_collisions, new_roots, bad_identities, my_root_sets) = {
let roots = self.roots.read().unwrap();
@ -513,7 +507,7 @@ impl Node {
if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) && !m.identity.eq(&self.identity)
{
debug_event!(
host_system,
app,
"[vl1] examining root {} with {} endpoints",
m.identity.address.to_string(),
m.endpoints.as_ref().map_or(0, |e| e.len())
@ -546,13 +540,13 @@ impl Node {
};
for c in address_collisions.iter() {
host_system.event(Event::SecurityWarning(format!(
app.event(Event::SecurityWarning(format!(
"address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!",
c.to_string()
)));
}
for i in bad_identities.iter() {
host_system.event(Event::SecurityWarning(format!(
app.event(Event::SecurityWarning(format!(
"bad identity detected for address {} in at least one root set, ignoring (error creating peer object)",
i.address.to_string()
)));
@ -566,7 +560,7 @@ impl Node {
let mut roots = self.roots.write().unwrap();
roots.roots = new_roots;
roots.this_root_sets = my_root_sets;
host_system.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
app.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
}
}
@ -592,7 +586,7 @@ impl Node {
let mut best_root = self.best_root.write().unwrap();
if let Some(best_root) = best_root.as_mut() {
debug_event!(
host_system,
app,
"[vl1] selected new best root: {} (replaced {})",
best.identity.address.to_string(),
best_root.identity.address.to_string()
@ -600,7 +594,7 @@ impl Node {
*best_root = best.clone();
} else {
debug_event!(
host_system,
app,
"[vl1] selected new best root: {} (was empty)",
best.identity.address.to_string()
);
@ -610,7 +604,7 @@ impl Node {
} else {
if let Some(old_best) = self.best_root.write().unwrap().take() {
debug_event!(
host_system,
app,
"[vl1] selected new best root: NONE (replaced {})",
old_best.identity.address.to_string()
);
@ -622,12 +616,12 @@ impl Node {
if !roots.online {
drop(roots);
self.roots.write().unwrap().online = true;
host_system.event(Event::Online(true));
app.event(Event::Online(true));
}
} else if roots.online {
drop(roots);
self.roots.write().unwrap().online = false;
host_system.event(Event::Online(false));
app.event(Event::Online(false));
}
}
}
@ -648,14 +642,14 @@ impl Node {
for (root, endpoints) in roots.iter() {
for ep in endpoints.iter() {
debug_event!(
host_system,
app,
"sending HELLO to root {} (root interval: {})",
root.identity.address.to_string(),
ROOT_HELLO_INTERVAL
);
let root = root.clone();
let ep = ep.clone();
root.send_hello(host_system, self, Some(&ep));
root.send_hello(app, self, Some(&ep));
}
}
}
@ -667,7 +661,7 @@ impl Node {
{
let roots = self.roots.read().unwrap();
for (a, peer) in self.peers.read().unwrap().iter() {
if !peer.service(host_system, self, time_ticks) && !roots.roots.contains_key(peer) {
if !peer.service(app, self, time_ticks) && !roots.roots.contains_key(peer) {
dead_peers.push(*a);
}
}
@ -682,8 +676,8 @@ impl Node {
let mut need_keepalive = Vec::new();
// First check all paths in read mode to avoid blocking the entire node.
for (k, path) in self.paths.read().unwrap().get::<PathMap<HostSystemImpl>>().iter() {
if host_system.local_socket_is_valid(k.local_socket()) {
for (k, path) in self.paths.read().unwrap().get::<PathMap<Application>>().iter() {
if app.local_socket_is_valid(k.local_socket()) {
match path.service(time_ticks) {
PathServiceResult::Ok => {}
PathServiceResult::Dead => dead_paths.push(k.to_copied()),
@ -696,16 +690,16 @@ impl Node {
// Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking.
for dp in dead_paths.iter() {
self.paths.write().unwrap().get_mut::<PathMap<HostSystemImpl>>().remove(dp);
self.paths.write().unwrap().get_mut::<PathMap<Application>>().remove(dp);
}
// Finally run keepalive sends as a batch.
let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance
for p in need_keepalive.iter() {
host_system.wire_send(
app.wire_send(
&p.endpoint,
Some(p.local_socket::<HostSystemImpl>()),
Some(p.local_interface::<HostSystemImpl>()),
Some(p.local_socket::<Application>()),
Some(p.local_interface::<Application>()),
&keepalive_buf,
0,
);
@ -727,7 +721,7 @@ impl Node {
need_whois
};
if !need_whois.is_empty() {
self.send_whois(host_system, need_whois.as_slice(), time_ticks);
self.send_whois(app, need_whois.as_slice(), time_ticks);
}
}
@ -735,18 +729,18 @@ impl Node {
INTERVAL
}
pub fn handle_incoming_physical_packet<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
pub fn handle_incoming_physical_packet<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
source_endpoint: &Endpoint,
source_local_socket: &HostSystemImpl::LocalSocket,
source_local_interface: &HostSystemImpl::LocalInterface,
source_local_socket: &Application::LocalSocket,
source_local_interface: &Application::LocalInterface,
time_ticks: i64,
mut packet: PooledPacketBuffer,
) {
debug_event!(
host_system,
app,
"[vl1] {} -> #{} {}->{} length {} (on socket {}@{})",
source_endpoint.to_string(),
packet
@ -779,15 +773,14 @@ impl Node {
if dest == self.identity.address {
let fragment_header = &*fragment_header; // discard mut
let path =
self.canonical_path::<HostSystemImpl>(source_endpoint, source_local_socket, source_local_interface, time_ticks);
let path = self.canonical_path::<Application>(source_endpoint, source_local_socket, source_local_interface, time_ticks);
path.log_receive_anything(time_ticks);
if fragment_header.is_fragment() {
#[cfg(debug_assertions)]
let fragment_header_id = u64::from_be_bytes(fragment_header.id);
debug_event!(
host_system,
app,
"[vl1] [v1] #{:0>16x} fragment {} of {} received",
u64::from_be_bytes(fragment_header.id),
fragment_header.fragment_no(),
@ -803,14 +796,14 @@ impl Node {
) {
if let Some(frag0) = assembled_packet.frags[0].as_ref() {
#[cfg(debug_assertions)]
debug_event!(host_system, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id);
debug_event!(app, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id);
if let Ok(packet_header) = frag0.struct_at::<v1::PacketHeader>(0) {
if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) {
peer.v1_proto_receive(
self,
host_system,
app,
inner,
time_ticks,
&path,
@ -821,7 +814,7 @@ impl Node {
} else {
// If WHOIS is needed we need to go ahead and combine the packet so it can be cached
// for later processing when a WHOIS reply comes back.
let mut combined_packet = host_system.get_buffer();
let mut combined_packet = app.get_buffer();
let mut ok = combined_packet.append_bytes(frag0.as_bytes()).is_ok();
for i in 1..assembled_packet.have {
if let Some(f) = assembled_packet.frags[i as usize].as_ref() {
@ -832,7 +825,7 @@ impl Node {
}
}
if ok {
self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks);
self.whois(app, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks);
}
}
} // else source address invalid
@ -840,17 +833,13 @@ impl Node {
} // else reassembly failed (in a way that shouldn't be possible)
} // else packet not fully assembled yet
} else if let Ok(packet_header) = packet.struct_at::<v1::PacketHeader>(0) {
debug_event!(
host_system,
"[vl1] [v1] #{:0>16x} is unfragmented",
u64::from_be_bytes(packet_header.id)
);
debug_event!(app, "[vl1] [v1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id));
if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) {
peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]);
peer.v1_proto_receive(self, app, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]);
} else {
self.whois(host_system, source, Some((Arc::downgrade(&path), packet)), time_ticks);
self.whois(app, source, Some((Arc::downgrade(&path), packet)), time_ticks);
}
}
} // else not fragment and header incomplete
@ -866,7 +855,7 @@ impl Node {
{
debug_packet_id = u64::from_be_bytes(fragment_header.id);
debug_event!(
host_system,
app,
"[vl1] [v1] #{:0>16x} forwarding packet fragment to {}",
debug_packet_id,
dest.to_string()
@ -874,7 +863,7 @@ impl Node {
}
if fragment_header.increment_hops() > v1::FORWARD_MAX_HOPS {
#[cfg(debug_assertions)]
debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
debug_event!(app, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
return;
}
} else if let Ok(packet_header) = packet.struct_mut_at::<v1::PacketHeader>(0) {
@ -882,7 +871,7 @@ impl Node {
{
debug_packet_id = u64::from_be_bytes(packet_header.id);
debug_event!(
host_system,
app,
"[vl1] [v1] #{:0>16x} forwarding packet to {}",
debug_packet_id,
dest.to_string()
@ -891,7 +880,7 @@ impl Node {
if packet_header.increment_hops() > v1::FORWARD_MAX_HOPS {
#[cfg(debug_assertions)]
debug_event!(
host_system,
app,
"[vl1] [v1] #{:0>16x} discarded: max hops exceeded!",
u64::from_be_bytes(packet_header.id)
);
@ -903,10 +892,10 @@ impl Node {
if let Some(peer) = self.peer(dest) {
if let Some(forward_path) = peer.direct_path() {
host_system.wire_send(
app.wire_send(
&forward_path.endpoint,
Some(forward_path.local_socket::<HostSystemImpl>()),
Some(forward_path.local_interface::<HostSystemImpl>()),
Some(forward_path.local_socket::<Application>()),
Some(forward_path.local_interface::<Application>()),
packet.as_bytes(),
0,
);
@ -914,7 +903,7 @@ impl Node {
peer.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
#[cfg(debug_assertions)]
debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id);
debug_event!(app, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id);
}
}
} // else not for this node and shouldn't be forwarded
@ -923,9 +912,9 @@ impl Node {
}
/// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply.
fn whois<HostSystemImpl: HostSystem + ?Sized>(
fn whois<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
address: Address,
waiting_packet: Option<(Weak<Path>, PooledPacketBuffer)>,
time_ticks: i64,
@ -947,13 +936,13 @@ impl Node {
qi.retry_count += 1;
}
}
self.send_whois(host_system, &[address], time_ticks);
self.send_whois(app, &[address], time_ticks);
}
/// Send a WHOIS query to the current best root.
fn send_whois<HostSystemImpl: HostSystem + ?Sized>(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) {
fn send_whois<Application: ApplicationLayer + ?Sized>(&self, app: &Application, mut addresses: &[Address], time_ticks: i64) {
debug_assert!(!addresses.is_empty());
debug_event!(host_system, "[vl1] [v1] sending WHOIS for {}", {
debug_event!(app, "[vl1] [v1] sending WHOIS for {}", {
let mut tmp = String::new();
for a in addresses.iter() {
if !tmp.is_empty() {
@ -966,7 +955,7 @@ impl Node {
if let Some(root) = self.best_root() {
while !addresses.is_empty() {
if !root
.send(host_system, self, None, time_ticks, |packet| -> Result<(), Infallible> {
.send(app, self, None, time_ticks, |packet| -> Result<(), Infallible> {
assert!(packet.append_u8(message_type::VL1_WHOIS).is_ok());
while !addresses.is_empty() && (packet.len() + ADDRESS_SIZE) <= UDP_DEFAULT_MTU {
assert!(packet.append_bytes_fixed(&addresses[0].to_bytes()).is_ok());
@ -983,10 +972,10 @@ impl Node {
}
/// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS).
pub(crate) fn handle_incoming_identity<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
pub(crate) fn handle_incoming_identity<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
received_identity: Identity,
time_ticks: i64,
authoritative: bool,
@ -996,7 +985,7 @@ impl Node {
let mut whois_queue = self.whois_queue.lock().unwrap();
if let Some(qi) = whois_queue.get_mut(&received_identity.address) {
let address = received_identity.address;
if host_system.should_respond_to(&received_identity) {
if app.should_respond_to(&received_identity) {
let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get(&address).cloned().or_else(|| {
Peer::new(&self.identity, received_identity, time_ticks)
@ -1007,7 +996,7 @@ impl Node {
for p in qi.v1_proto_waiting_packets.iter() {
if let Some(path) = p.0.upgrade() {
if let Ok(packet_header) = p.1.struct_at::<v1::PacketHeader>(0) {
peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]);
peer.v1_proto_receive(self, app, inner, time_ticks, &path, packet_header, &p.1, &[]);
}
}
}
@ -1034,25 +1023,25 @@ impl Node {
}
/// Get the canonical Path object corresponding to an endpoint.
pub(crate) fn canonical_path<HostSystemImpl: HostSystem + ?Sized>(
pub(crate) fn canonical_path<Application: ApplicationLayer + ?Sized>(
&self,
ep: &Endpoint,
local_socket: &HostSystemImpl::LocalSocket,
local_interface: &HostSystemImpl::LocalInterface,
local_socket: &Application::LocalSocket,
local_interface: &Application::LocalInterface,
time_ticks: i64,
) -> Arc<Path> {
let paths = self.paths.read().unwrap();
if let Some(path) = paths.get::<PathMap<HostSystemImpl>>().get(&PathKey::Ref(ep, local_socket)) {
if let Some(path) = paths.get::<PathMap<Application>>().get(&PathKey::Ref(ep, local_socket)) {
path.clone()
} else {
drop(paths);
self.paths
.write()
.unwrap()
.get_mut::<PathMap<HostSystemImpl>>()
.get_mut::<PathMap<Application>>()
.entry(PathKey::Copied(ep.clone(), local_socket.clone()))
.or_insert_with(|| {
Arc::new(Path::new::<HostSystemImpl>(
Arc::new(Path::new::<Application>(
ep.clone(),
local_socket.clone(),
local_interface.clone(),
@ -1066,12 +1055,12 @@ impl Node {
/// Key used to look up paths in a hash map
/// This supports copied keys for storing and refs for fast lookup without having to copy anything.
enum PathKey<'a, 'b, HostSystemImpl: HostSystem + ?Sized> {
Copied(Endpoint, HostSystemImpl::LocalSocket),
Ref(&'a Endpoint, &'b HostSystemImpl::LocalSocket),
enum PathKey<'a, 'b, Application: ApplicationLayer + ?Sized> {
Copied(Endpoint, Application::LocalSocket),
Ref(&'a Endpoint, &'b Application::LocalSocket),
}
impl<HostSystemImpl: HostSystem + ?Sized> Hash for PathKey<'_, '_, HostSystemImpl> {
impl<Application: ApplicationLayer + ?Sized> Hash for PathKey<'_, '_, Application> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Self::Copied(ep, ls) => {
@ -1086,7 +1075,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Hash for PathKey<'_, '_, HostSystemImp
}
}
impl<HostSystemImpl: HostSystem + ?Sized> PartialEq for PathKey<'_, '_, HostSystemImpl> {
impl<Application: ApplicationLayer + ?Sized> PartialEq for PathKey<'_, '_, Application> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2),
@ -1097,11 +1086,11 @@ impl<HostSystemImpl: HostSystem + ?Sized> PartialEq for PathKey<'_, '_, HostSyst
}
}
impl<HostSystemImpl: HostSystem + ?Sized> Eq for PathKey<'_, '_, HostSystemImpl> {}
impl<Application: ApplicationLayer + ?Sized> Eq for PathKey<'_, '_, Application> {}
impl<HostSystemImpl: HostSystem + ?Sized> PathKey<'_, '_, HostSystemImpl> {
impl<Application: ApplicationLayer + ?Sized> PathKey<'_, '_, Application> {
#[inline(always)]
fn local_socket(&self) -> &HostSystemImpl::LocalSocket {
fn local_socket(&self) -> &Application::LocalSocket {
match self {
Self::Copied(_, ls) => ls,
Self::Ref(_, ls) => *ls,
@ -1109,21 +1098,21 @@ impl<HostSystemImpl: HostSystem + ?Sized> PathKey<'_, '_, HostSystemImpl> {
}
#[inline(always)]
fn to_copied(&self) -> PathKey<'static, 'static, HostSystemImpl> {
fn to_copied(&self) -> PathKey<'static, 'static, Application> {
match self {
Self::Copied(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()),
Self::Ref(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()),
Self::Copied(ep, ls) => PathKey::<'static, 'static, Application>::Copied(ep.clone(), ls.clone()),
Self::Ref(ep, ls) => PathKey::<'static, 'static, Application>::Copied((*ep).clone(), (*ls).clone()),
}
}
}
/// Dummy no-op inner protocol for debugging and testing.
#[derive(Default)]
pub struct DummyInnerProtocol;
pub struct DummyInnerLayer;
impl InnerProtocol for DummyInnerProtocol {}
impl InnerLayer for DummyInnerLayer {}
impl VL1AuthProvider for DummyInnerProtocol {
impl PeerFilter for DummyInnerLayer {
#[inline(always)]
fn should_respond_to(&self, _: &Verified<Identity>) -> bool {
true

View file

@ -37,10 +37,10 @@ pub struct Path {
}
impl Path {
pub(crate) fn new<HostSystemImpl: HostSystem + ?Sized>(
pub(crate) fn new<Application: ApplicationLayer + ?Sized>(
endpoint: Endpoint,
local_socket: HostSystemImpl::LocalSocket,
local_interface: HostSystemImpl::LocalInterface,
local_socket: Application::LocalSocket,
local_interface: Application::LocalInterface,
time_ticks: i64,
) -> Self {
Self {
@ -55,12 +55,12 @@ impl Path {
}
#[inline(always)]
pub(crate) fn local_socket<HostSystemImpl: HostSystem + ?Sized>(&self) -> &HostSystemImpl::LocalSocket {
pub(crate) fn local_socket<Application: ApplicationLayer + ?Sized>(&self) -> &Application::LocalSocket {
self.local_socket.get()
}
#[inline(always)]
pub(crate) fn local_interface<HostSystemImpl: HostSystem + ?Sized>(&self) -> &HostSystemImpl::LocalInterface {
pub(crate) fn local_interface<Application: ApplicationLayer + ?Sized>(&self) -> &Application::LocalInterface {
self.local_interface.get()
}

View file

@ -53,7 +53,7 @@ struct RemoteNodeInfo {
}
/// Sort a list of paths by quality or priority, with best paths first.
fn prioritize_paths<HostSystemImpl: HostSystem + ?Sized>(paths: &mut Vec<PeerPath>) {
fn prioritize_paths<Application: ApplicationLayer + ?Sized>(paths: &mut Vec<PeerPath>) {
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
}
@ -136,7 +136,7 @@ impl Peer {
return None;
}
fn learn_path<HostSystemImpl: HostSystem + ?Sized>(&self, host_system: &HostSystemImpl, new_path: &Arc<Path>, time_ticks: i64) {
fn learn_path<Application: ApplicationLayer + ?Sized>(&self, app: &Application, new_path: &Arc<Path>, time_ticks: i64) {
let mut paths = self.paths.lock().unwrap();
// TODO: check path filter
@ -155,7 +155,7 @@ impl Peer {
Endpoint::IpUdp(existing_ip) => {
if existing_ip.ip_bytes().eq(new_ip.ip_bytes()) {
debug_event!(
host_system,
app,
"[vl1] {} replacing path {} with {} (same IP, different port)",
self.identity.address.to_string(),
p.endpoint.to_string(),
@ -163,7 +163,7 @@ impl Peer {
);
pi.path = Arc::downgrade(new_path);
pi.last_receive_time_ticks = time_ticks;
prioritize_paths::<HostSystemImpl>(&mut paths);
prioritize_paths::<Application>(&mut paths);
return;
}
}
@ -183,7 +183,7 @@ impl Peer {
// Learn new path if it's not a duplicate or should not replace an existing path.
debug_event!(
host_system,
app,
"[vl1] {} learned new path: {}",
self.identity.address.to_string(),
new_path.endpoint.to_string()
@ -192,7 +192,7 @@ impl Peer {
path: Arc::downgrade(new_path),
last_receive_time_ticks: time_ticks,
});
prioritize_paths::<HostSystemImpl>(&mut paths);
prioritize_paths::<Application>(&mut paths);
}
/// Get the next sequential message ID for use with the V1 transport protocol.
@ -202,7 +202,7 @@ impl Peer {
}
/// Called every SERVICE_INTERVAL_MS by the background service loop in Node.
pub(crate) fn service<HostSystemImpl: HostSystem + ?Sized>(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool {
pub(crate) fn service<Application: ApplicationLayer + ?Sized>(&self, _: &Application, _: &Node, time_ticks: i64) -> bool {
// Prune dead paths and sort in descending order of quality.
{
let mut paths = self.paths.lock().unwrap();
@ -210,7 +210,7 @@ impl Peer {
if paths.capacity() > 16 {
paths.shrink_to_fit();
}
prioritize_paths::<HostSystemImpl>(&mut paths);
prioritize_paths::<Application>(&mut paths);
}
// Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs).
@ -223,19 +223,19 @@ impl Peer {
}
/// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed.
fn v1_proto_internal_send<HostSystemImpl: HostSystem + ?Sized>(
fn v1_proto_internal_send<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
endpoint: &Endpoint,
local_socket: Option<&HostSystemImpl::LocalSocket>,
local_interface: Option<&HostSystemImpl::LocalInterface>,
local_socket: Option<&Application::LocalSocket>,
local_interface: Option<&Application::LocalInterface>,
max_fragment_size: usize,
packet: PooledPacketBuffer,
) {
let packet_size = packet.len();
if packet_size > max_fragment_size {
let bytes = packet.as_bytes();
host_system.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0);
app.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0);
let mut pos = UDP_DEFAULT_MTU;
let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32;
@ -259,7 +259,7 @@ impl Peer {
let fragment_size = v1::FRAGMENT_HEADER_SIZE + chunk_size;
tmp_buf[..v1::FRAGMENT_HEADER_SIZE].copy_from_slice(header.as_bytes());
tmp_buf[v1::FRAGMENT_HEADER_SIZE..fragment_size].copy_from_slice(&bytes[pos..next_pos]);
host_system.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0);
app.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0);
pos = next_pos;
if pos < packet_size {
chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE);
@ -268,7 +268,7 @@ impl Peer {
}
}
} else {
host_system.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0);
app.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0);
}
}
@ -281,9 +281,9 @@ impl Peer {
/// The builder function must append the verb (with any verb flags) and packet payload. If it returns
/// an error, the error is returned immediately and the send is aborted. None is returned if the send
/// function itself fails for some reason such as no paths being available.
pub fn send<HostSystemImpl: HostSystem + ?Sized, R, E, BuilderFunction: FnOnce(&mut PacketBuffer) -> Result<R, E>>(
pub fn send<Application: ApplicationLayer + ?Sized, R, E, BuilderFunction: FnOnce(&mut PacketBuffer) -> Result<R, E>>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
path: Option<&Arc<Path>>,
time_ticks: i64,
@ -303,7 +303,7 @@ impl Peer {
let max_fragment_size = path.endpoint.max_fragment_size();
let mut packet = host_system.get_buffer();
let mut packet = app.get_buffer();
if !self.is_v2() {
// For the V1 protocol, leave room for for the header in the buffer.
packet.set_size(v1::HEADER_SIZE);
@ -371,10 +371,10 @@ impl Peer {
}
self.v1_proto_internal_send(
host_system,
app,
&path.endpoint,
Some(path.local_socket::<HostSystemImpl>()),
Some(path.local_interface::<HostSystemImpl>()),
Some(path.local_socket::<Application>()),
Some(path.local_interface::<Application>()),
max_fragment_size,
packet,
);
@ -393,9 +393,9 @@ impl Peer {
/// Unlike other messages HELLO is sent partially in the clear and always with the long-lived
/// static identity key. Authentication in old versions is via Poly1305 and in new versions
/// via HMAC-SHA512.
pub(crate) fn send_hello<HostSystemImpl: HostSystem + ?Sized>(
pub(crate) fn send_hello<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
explicit_endpoint: Option<&Endpoint>,
) -> bool {
@ -412,9 +412,9 @@ impl Peer {
};
let max_fragment_size = destination.max_fragment_size();
let time_ticks = host_system.time_ticks();
let time_ticks = app.time_ticks();
let mut packet = host_system.get_buffer();
let mut packet = app.get_buffer();
{
let message_id = self.v1_proto_next_message_id();
@ -447,7 +447,7 @@ impl Peer {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
debug_event!(
host_system,
app,
"HELLO -> {} @ {} ({} bytes)",
self.identity.address.to_string(),
destination.to_string(),
@ -457,16 +457,16 @@ impl Peer {
if let Some(p) = path.as_ref() {
self.v1_proto_internal_send(
host_system,
app,
destination,
Some(p.local_socket::<HostSystemImpl>()),
Some(p.local_interface::<HostSystemImpl>()),
Some(p.local_socket::<Application>()),
Some(p.local_interface::<Application>()),
max_fragment_size,
packet,
);
p.log_send_anything(time_ticks);
} else {
self.v1_proto_internal_send(host_system, destination, None, None, max_fragment_size, packet);
self.v1_proto_internal_send(app, destination, None, None, max_fragment_size, packet);
}
return true;
@ -478,11 +478,11 @@ impl Peer {
/// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) fn v1_proto_receive<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
pub(crate) fn v1_proto_receive<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
self: &Arc<Self>,
node: &Node,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
time_ticks: i64,
source_path: &Arc<Path>,
packet_header: &v1::PacketHeader,
@ -503,11 +503,7 @@ impl Peer {
message_id2
} else {
// Packet failed to decrypt using either ephemeral or permanent key, reject.
debug_event!(
host_system,
"[vl1] #{:0>16x} failed authentication",
u64::from_be_bytes(packet_header.id)
);
debug_event!(app, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id));
return PacketHandlerResult::Error;
};
@ -539,7 +535,7 @@ impl Peer {
verb &= v1::VERB_MASK; // mask off flags
debug_event!(
host_system,
app,
"[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})",
u64::from_be_bytes(packet_header.id),
message_type::name(verb),
@ -548,11 +544,9 @@ impl Peer {
return match verb {
message_type::VL1_NOP => PacketHandlerResult::Ok,
message_type::VL1_HELLO => {
self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload)
}
message_type::VL1_HELLO => self.handle_incoming_hello(app, inner, node, time_ticks, message_id, source_path, &payload),
message_type::VL1_ERROR => self.handle_incoming_error(
host_system,
app,
inner,
node,
time_ticks,
@ -562,7 +556,7 @@ impl Peer {
&payload,
),
message_type::VL1_OK => self.handle_incoming_ok(
host_system,
app,
inner,
node,
time_ticks,
@ -572,28 +566,16 @@ impl Peer {
path_is_known,
&payload,
),
message_type::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload),
message_type::VL1_WHOIS => self.handle_incoming_whois(app, inner, node, time_ticks, message_id, &payload),
message_type::VL1_RENDEZVOUS => {
self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload)
self.handle_incoming_rendezvous(app, node, time_ticks, message_id, source_path, &payload)
}
message_type::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload),
message_type::VL1_ECHO => self.handle_incoming_echo(app, inner, node, time_ticks, message_id, &payload),
message_type::VL1_PUSH_DIRECT_PATHS => {
self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload)
self.handle_incoming_push_direct_paths(app, node, time_ticks, source_path, &payload)
}
message_type::VL1_USER_MESSAGE => {
self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload)
}
_ => inner.handle_packet(
host_system,
node,
self,
&source_path,
packet_header.hops(),
message_id,
verb,
&payload,
1,
),
message_type::VL1_USER_MESSAGE => self.handle_incoming_user_message(app, node, time_ticks, source_path, &payload),
_ => inner.handle_packet(app, node, self, &source_path, packet_header.hops(), message_id, verb, &payload, 1),
};
}
}
@ -601,19 +583,19 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_hello<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_hello<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
node: &Node,
time_ticks: i64,
message_id: MessageId,
source_path: &Arc<Path>,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if !(host_system.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
if !(app.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
debug_event!(
host_system,
app,
"[vl1] dropping HELLO from {} due to lack of trust relationship",
self.identity.address.to_string()
);
@ -634,25 +616,19 @@ impl Peer {
);
}
self.send(
host_system,
node,
Some(source_path),
time_ticks,
|packet| -> Result<(), Infallible> {
let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) =
packet.append_struct_get_mut().unwrap();
f.0.verb = message_type::VL1_OK;
f.0.in_re_verb = message_type::VL1_HELLO;
f.0.in_re_message_id = message_id.to_ne_bytes();
f.1.timestamp_echo = hello_fixed_headers.timestamp;
f.1.version_proto = PROTOCOL_VERSION;
f.1.version_major = VERSION_MAJOR;
f.1.version_minor = VERSION_MINOR;
f.1.version_revision = VERSION_REVISION.to_be_bytes();
Ok(())
},
);
self.send(app, node, Some(source_path), time_ticks, |packet| -> Result<(), Infallible> {
let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) =
packet.append_struct_get_mut().unwrap();
f.0.verb = message_type::VL1_OK;
f.0.in_re_verb = message_type::VL1_HELLO;
f.0.in_re_message_id = message_id.to_ne_bytes();
f.1.timestamp_echo = hello_fixed_headers.timestamp;
f.1.version_proto = PROTOCOL_VERSION;
f.1.version_major = VERSION_MAJOR;
f.1.version_minor = VERSION_MINOR;
f.1.version_revision = VERSION_REVISION.to_be_bytes();
Ok(())
});
return PacketHandlerResult::Ok;
}
@ -662,10 +638,10 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_error<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_error<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
node: &Node,
_time_ticks: i64,
source_path: &Arc<Path>,
@ -682,7 +658,7 @@ impl Peer {
match error_header.in_re_verb {
_ => {
return inner.handle_error(
host_system,
app,
node,
self,
&source_path,
@ -700,10 +676,10 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_ok<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_ok<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
node: &Node,
time_ticks: i64,
source_path: &Arc<Path>,
@ -724,7 +700,7 @@ impl Peer {
payload.read_struct::<v1::message_component_structs::OkHelloFixedHeaderFields>(&mut cursor)
{
if source_hops == 0 {
debug_event!(host_system, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),);
debug_event!(app, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),);
if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) {
#[cfg(debug_assertions)]
let reported_endpoint2 = reported_endpoint.clone();
@ -738,7 +714,7 @@ impl Peer {
{
#[cfg(debug_assertions)]
debug_event!(
host_system,
app,
"[vl1] {} reported new remote perspective, local endpoint: {}",
self.identity.address.to_string(),
reported_endpoint2.to_string()
@ -748,7 +724,7 @@ impl Peer {
}
if source_hops == 0 && !path_is_known {
self.learn_path(host_system, source_path, time_ticks);
self.learn_path(app, source_path, time_ticks);
}
self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed);
@ -756,21 +732,21 @@ impl Peer {
}
message_type::VL1_WHOIS => {
debug_event!(host_system, "[vl1] OK(WHOIS)");
debug_event!(app, "[vl1] OK(WHOIS)");
if node.is_peer_root(self) {
while cursor < payload.len() {
let r = Identity::unmarshal(payload, &mut cursor);
if let Ok(received_identity) = r {
debug_event!(
host_system,
app,
"[vl1] {} OK(WHOIS): received identity: {}",
self.identity.address.to_string(),
received_identity.to_string()
);
node.handle_incoming_identity(host_system, inner, received_identity, time_ticks, true);
node.handle_incoming_identity(app, inner, received_identity, time_ticks, true);
} else {
debug_event!(
host_system,
app,
"[vl1] {} OK(WHOIS): received bad identity: {}",
self.identity.address.to_string(),
r.err().unwrap().to_string()
@ -785,7 +761,7 @@ impl Peer {
_ => {
return inner.handle_ok(
host_system,
app,
node,
self,
&source_path,
@ -802,20 +778,20 @@ impl Peer {
return PacketHandlerResult::Error;
}
fn handle_incoming_whois<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_whois<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
node: &Node,
time_ticks: i64,
message_id: MessageId,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if node.this_node_is_root() || host_system.should_respond_to(&self.identity) {
if node.this_node_is_root() || app.should_respond_to(&self.identity) {
let mut addresses = payload.as_bytes();
while addresses.len() >= ADDRESS_SIZE {
if !self
.send(host_system, node, None, time_ticks, |packet| {
.send(app, node, None, time_ticks, |packet| {
while addresses.len() >= ADDRESS_SIZE && (packet.len() + Identity::MAX_MARSHAL_SIZE) <= UDP_DEFAULT_MTU {
if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) {
if let Some(peer) = node.peer(zt_address) {
@ -835,9 +811,9 @@ impl Peer {
return PacketHandlerResult::Ok;
}
fn handle_incoming_rendezvous<HostSystemImpl: HostSystem + ?Sized>(
fn handle_incoming_rendezvous<Application: ApplicationLayer + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
time_ticks: i64,
message_id: MessageId,
@ -848,17 +824,17 @@ impl Peer {
return PacketHandlerResult::Ok;
}
fn handle_incoming_echo<HostSystemImpl: HostSystem + ?Sized, InnerProtocolImpl: InnerProtocol + ?Sized>(
fn handle_incoming_echo<Application: ApplicationLayer + ?Sized, Inner: InnerLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
app: &Application,
inner: &Inner,
node: &Node,
time_ticks: i64,
message_id: MessageId,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if host_system.should_respond_to(&self.identity) || node.is_peer_root(self) {
self.send(host_system, node, None, time_ticks, |packet| {
if app.should_respond_to(&self.identity) || node.is_peer_root(self) {
self.send(app, node, None, time_ticks, |packet| {
let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap();
f.verb = message_type::VL1_OK;
f.in_re_verb = message_type::VL1_ECHO;
@ -867,7 +843,7 @@ impl Peer {
});
} else {
debug_event!(
host_system,
app,
"[vl1] dropping ECHO from {} due to lack of trust relationship",
self.identity.address.to_string()
);
@ -875,9 +851,9 @@ impl Peer {
return PacketHandlerResult::Ok;
}
fn handle_incoming_push_direct_paths<HostSystemImpl: HostSystem + ?Sized>(
fn handle_incoming_push_direct_paths<Application: ApplicationLayer + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
time_ticks: i64,
source_path: &Arc<Path>,
@ -886,9 +862,9 @@ impl Peer {
PacketHandlerResult::Ok
}
fn handle_incoming_user_message<HostSystemImpl: HostSystem + ?Sized>(
fn handle_incoming_user_message<Application: ApplicationLayer + ?Sized>(
self: &Arc<Self>,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
time_ticks: i64,
source_path: &Arc<Path>,

View file

@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex, RwLock};
use crate::protocol;
use crate::protocol::PacketBuffer;
use crate::vl1::{Address, HostSystem, Identity, Node, PacketHandlerResult, Peer, MAC};
use crate::vl1::{Address, ApplicationLayer, Identity, Node, PacketHandlerResult, Peer, MAC};
use crate::vl2::{MulticastGroup, NetworkId};
use zerotier_utils::buffer::OutOfBoundsError;
@ -84,7 +84,7 @@ impl MulticastAuthority {
}
/// Call for VL2_MULTICAST_GATHER packets.
pub fn handle_vl2_multicast_gather<HostSystemImpl: HostSystem + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
pub fn handle_vl2_multicast_gather<HostSystemImpl: ApplicationLayer + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
&self,
auth: Authenticator,
time_ticks: i64,

View file

@ -3,17 +3,17 @@
use std::sync::Arc;
use crate::protocol::PacketBuffer;
use crate::vl1::{HostSystem, InnerProtocol, Node, PacketHandlerResult, Path, Peer};
use crate::vl1::{ApplicationLayer, InnerLayer, Node, PacketHandlerResult, Path, Peer};
pub trait SwitchInterface: Sync + Send {}
pub struct Switch {}
#[allow(unused_variables)]
impl InnerProtocol for Switch {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
impl InnerLayer for Switch {
fn handle_packet<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
@ -26,9 +26,9 @@ impl InnerProtocol for Switch {
PacketHandlerResult::NotHandled
}
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
fn handle_error<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
@ -43,9 +43,9 @@ impl InnerProtocol for Switch {
PacketHandlerResult::NotHandled
}
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
fn handle_ok<Application: ApplicationLayer + ?Sized>(
&self,
host_system: &HostSystemImpl,
app: &Application,
node: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,

View file

@ -209,7 +209,7 @@ fn main() {
Some(("service", _)) => {
drop(global_args); // free unnecessary heap before starting service as we're done with CLI args
if let Ok(_tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() {
let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerProtocol::default());
let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerLayer::default());
let datadir = open_datadir(&flags);
let svc = VL1Service::new(
datadir,

View file

@ -8,7 +8,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use zerotier_crypto::random::next_u32_secure;
use zerotier_network_hypervisor::vl1::{Identity, NodeStorage, Verified};
use zerotier_network_hypervisor::vl1::{Identity, NodeStorageProvider, Verified};
use zerotier_utils::io::{fs_restrict_permissions, read_limit, DEFAULT_FILE_IO_READ_LIMIT};
use zerotier_utils::json::to_json_pretty;
@ -26,7 +26,9 @@ pub struct DataDir<Config: PartialEq + Eq + Clone + Send + Sync + Default + Seri
authtoken: Mutex<String>,
}
impl<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + DeserializeOwned + 'static> NodeStorage for DataDir<Config> {
impl<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + DeserializeOwned + 'static> NodeStorageProvider
for DataDir<Config>
{
fn load_node_identity(&self) -> Option<Verified<Identity>> {
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096);
if id_data.is_err() {

View file

@ -27,14 +27,14 @@ const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10;
/// whatever inner protocol implementation is using it. This would typically be VL2 but could be
/// a test harness or just the controller for a controller that runs stand-alone.
pub struct VL1Service<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
NodeStorageImpl: NodeStorageProvider + ?Sized + 'static,
PeerToPeerAuthentication: PeerFilter + ?Sized + 'static,
Inner: InnerLayer + ?Sized + 'static,
> {
state: RwLock<VL1ServiceMutableState>,
storage: Arc<NodeStorageImpl>,
vl1_auth_provider: Arc<VL1AuthProviderImpl>,
inner: Arc<InnerProtocolImpl>,
vl1_auth_provider: Arc<PeerToPeerAuthentication>,
inner: Arc<Inner>,
buffer_pool: Arc<PacketBufferPool>,
node_container: Option<Node>, // never None, set in new()
}
@ -47,15 +47,15 @@ struct VL1ServiceMutableState {
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
NodeStorageImpl: NodeStorageProvider + ?Sized + 'static,
PeerToPeerAuthentication: PeerFilter + ?Sized + 'static,
Inner: InnerLayer + ?Sized + 'static,
> VL1Service<NodeStorageImpl, PeerToPeerAuthentication, Inner>
{
pub fn new(
storage: Arc<NodeStorageImpl>,
vl1_auth_provider: Arc<VL1AuthProviderImpl>,
inner: Arc<InnerProtocolImpl>,
vl1_auth_provider: Arc<PeerToPeerAuthentication>,
inner: Arc<Inner>,
settings: VL1Settings,
) -> Result<Arc<Self>, Box<dyn Error>> {
let mut service = Self {
@ -190,10 +190,10 @@ impl<
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> UdpPacketHandler for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
NodeStorageImpl: NodeStorageProvider + ?Sized + 'static,
PeerToPeerAuthentication: PeerFilter + ?Sized + 'static,
Inner: InnerLayer + ?Sized + 'static,
> UdpPacketHandler for VL1Service<NodeStorageImpl, PeerToPeerAuthentication, Inner>
{
#[inline(always)]
fn incoming_udp_packet(
@ -216,10 +216,10 @@ impl<
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> HostSystem for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
NodeStorageImpl: NodeStorageProvider + ?Sized + 'static,
PeerToPeerAuthentication: PeerFilter + ?Sized + 'static,
Inner: InnerLayer + ?Sized + 'static,
> ApplicationLayer for VL1Service<NodeStorageImpl, PeerToPeerAuthentication, Inner>
{
type Storage = NodeStorageImpl;
type LocalSocket = crate::LocalSocket;
@ -322,10 +322,10 @@ impl<
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> NodeStorage for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
NodeStorageImpl: NodeStorageProvider + ?Sized + 'static,
PeerToPeerAuthentication: PeerFilter + ?Sized + 'static,
Inner: InnerLayer + ?Sized + 'static,
> NodeStorageProvider for VL1Service<NodeStorageImpl, PeerToPeerAuthentication, Inner>
{
#[inline(always)]
fn load_node_identity(&self) -> Option<Verified<Identity>> {
@ -339,10 +339,10 @@ impl<
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> VL1AuthProvider for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
NodeStorageImpl: NodeStorageProvider + ?Sized + 'static,
PeerToPeerAuthentication: PeerFilter + ?Sized + 'static,
Inner: InnerLayer + ?Sized + 'static,
> PeerFilter for VL1Service<NodeStorageImpl, PeerToPeerAuthentication, Inner>
{
#[inline(always)]
fn should_respond_to(&self, id: &Verified<Identity>) -> bool {
@ -356,10 +356,10 @@ impl<
}
impl<
NodeStorageImpl: NodeStorage + ?Sized + 'static,
VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static,
InnerProtocolImpl: InnerProtocol + ?Sized + 'static,
> Drop for VL1Service<NodeStorageImpl, VL1AuthProviderImpl, InnerProtocolImpl>
NodeStorageImpl: NodeStorageProvider + ?Sized + 'static,
PeerToPeerAuthentication: PeerFilter + ?Sized + 'static,
Inner: InnerLayer + ?Sized + 'static,
> Drop for VL1Service<NodeStorageImpl, PeerToPeerAuthentication, Inner>
{
fn drop(&mut self) {
let mut state = self.state.write().unwrap();