This commit is contained in:
Adam Ierymenko 2022-10-25 08:40:24 -04:00
parent 88e613e043
commit 127e27326a
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
5 changed files with 167 additions and 164 deletions

View file

@ -25,7 +25,7 @@ use crate::model::{AuthorizationResult, Member, RequestLogItem, CREDENTIAL_WINDO
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries. /// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries.
pub struct Handler { pub struct Controller {
self_ref: Weak<Self>, self_ref: Weak<Self>,
service: RwLock<Weak<VL1Service<dyn Database, Self, Self>>>, service: RwLock<Weak<VL1Service<dyn Database, Self, Self>>>,
reaper: Reaper, reaper: Reaper,
@ -35,7 +35,7 @@ pub struct Handler {
local_identity: Identity, local_identity: Identity,
} }
impl Handler { impl Controller {
/// Start an inner protocol handler answer ZeroTier VL2 network controller queries. /// Start an inner protocol handler answer ZeroTier VL2 network controller queries.
pub async fn new(database: Arc<dyn Database>, runtime: tokio::runtime::Handle) -> Result<Arc<Self>, Box<dyn Error>> { pub async fn new(database: Arc<dyn Database>, runtime: tokio::runtime::Handle) -> Result<Arc<Self>, Box<dyn Error>> {
if let Some(local_identity) = database.load_node_identity() { if let Some(local_identity) = database.load_node_identity() {
@ -88,155 +88,7 @@ impl Handler {
self.daemons.lock().unwrap().push(cw); self.daemons.lock().unwrap().push(cw);
} }
} }
}
// Default PathFilter implementations permit anything.
impl PathFilter for Handler {}
impl InnerProtocol for Handler {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
_: &HostSystemImpl,
_: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
verb: u8,
payload: &PacketBuffer,
mut cursor: usize,
) -> PacketHandlerResult {
match verb {
protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => {
let network_id = payload.read_u64(&mut cursor);
if network_id.is_err() {
return PacketHandlerResult::Error;
}
let network_id = NetworkId::from_u64(network_id.unwrap());
if network_id.is_none() {
return PacketHandlerResult::Error;
}
let network_id = network_id.unwrap();
let meta_data = if (cursor + 2) < payload.len() {
let meta_data_len = payload.read_u16(&mut cursor);
if meta_data_len.is_err() {
return PacketHandlerResult::Error;
}
if let Ok(d) = payload.read_bytes(meta_data_len.unwrap() as usize, &mut cursor) {
let d = Dictionary::from_bytes(d);
if d.is_none() {
return PacketHandlerResult::Error;
}
d.unwrap()
} else {
return PacketHandlerResult::Error;
}
} else {
Dictionary::new()
};
/*
let (have_revision, have_timestamp) = if (cursor + 16) <= payload.len() {
let r = payload.read_u64(&mut cursor);
let t = payload.read_u64(&mut cursor);
if r.is_err() || t.is_err() {
return PacketHandlerResult::Error;
}
(Some(r.unwrap()), Some(t.unwrap()))
} else {
(None, None)
};
*/
// Launch handler as an async background task.
let (self2, peer, source_remote_endpoint) =
(self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone());
self.reaper.add(
self.runtime.spawn(async move {
let node_id = peer.identity.address;
let node_fingerprint = Blob::from(peer.identity.fingerprint);
let now = ms_since_epoch();
let result = match self2.handle_network_config_request(&peer.identity, network_id, now).await {
Result::Ok((result, Some(config))) => {
self2.send_network_config(peer.as_ref(), &config, Some(message_id));
result
}
Result::Ok((result, None)) => result,
Result::Err(_) => {
// TODO: log invalid request or internal error
return;
}
};
let _ = self2
.database
.log_request(RequestLogItem {
network_id,
node_id,
node_fingerprint,
controller_node_id: self2.local_identity.address,
metadata: if meta_data.is_empty() {
Vec::new()
} else {
meta_data.to_bytes()
},
timestamp: now,
source_remote_endpoint,
source_hops,
result,
})
.await;
}),
Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(),
);
PacketHandlerResult::Ok
}
_ => PacketHandlerResult::NotHandled,
}
}
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,
_error_code: u8,
_payload: &PacketBuffer,
_cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,
_payload: &PacketBuffer,
_cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
fn should_respond_to(&self, _: &Identity) -> bool {
true
}
}
impl Handler {
fn send_network_config( fn send_network_config(
&self, &self,
peer: &Peer, peer: &Peer,
@ -415,7 +267,153 @@ impl Handler {
} }
} }
impl Drop for Handler { // Default PathFilter implementations permit anything.
impl PathFilter for Controller {}
impl InnerProtocol for Controller {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
_: &HostSystemImpl,
_: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
message_id: u64,
verb: u8,
payload: &PacketBuffer,
mut cursor: usize,
) -> PacketHandlerResult {
match verb {
protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => {
let network_id = payload.read_u64(&mut cursor);
if network_id.is_err() {
return PacketHandlerResult::Error;
}
let network_id = NetworkId::from_u64(network_id.unwrap());
if network_id.is_none() {
return PacketHandlerResult::Error;
}
let network_id = network_id.unwrap();
let meta_data = if (cursor + 2) < payload.len() {
let meta_data_len = payload.read_u16(&mut cursor);
if meta_data_len.is_err() {
return PacketHandlerResult::Error;
}
if let Ok(d) = payload.read_bytes(meta_data_len.unwrap() as usize, &mut cursor) {
let d = Dictionary::from_bytes(d);
if d.is_none() {
return PacketHandlerResult::Error;
}
d.unwrap()
} else {
return PacketHandlerResult::Error;
}
} else {
Dictionary::new()
};
/*
let (have_revision, have_timestamp) = if (cursor + 16) <= payload.len() {
let r = payload.read_u64(&mut cursor);
let t = payload.read_u64(&mut cursor);
if r.is_err() || t.is_err() {
return PacketHandlerResult::Error;
}
(Some(r.unwrap()), Some(t.unwrap()))
} else {
(None, None)
};
*/
// Launch handler as an async background task.
let (self2, peer, source_remote_endpoint) =
(self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone());
self.reaper.add(
self.runtime.spawn(async move {
let node_id = peer.identity.address;
let node_fingerprint = Blob::from(peer.identity.fingerprint);
let now = ms_since_epoch();
let result = match self2.handle_network_config_request(&peer.identity, network_id, now).await {
Result::Ok((result, Some(config))) => {
self2.send_network_config(peer.as_ref(), &config, Some(message_id));
result
}
Result::Ok((result, None)) => result,
Result::Err(_) => {
// TODO: log invalid request or internal error
return;
}
};
let _ = self2
.database
.log_request(RequestLogItem {
network_id,
node_id,
node_fingerprint,
controller_node_id: self2.local_identity.address,
metadata: if meta_data.is_empty() {
Vec::new()
} else {
meta_data.to_bytes()
},
timestamp: now,
source_remote_endpoint,
source_hops,
result,
})
.await;
}),
Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(),
);
PacketHandlerResult::Ok
}
_ => PacketHandlerResult::NotHandled,
}
}
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,
_error_code: u8,
_payload: &PacketBuffer,
_cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node,
_source: &Arc<Peer>,
_source_path: &Arc<Path>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,
_payload: &PacketBuffer,
_cursor: usize,
) -> PacketHandlerResult {
PacketHandlerResult::NotHandled
}
fn should_respond_to(&self, _: &Identity) -> bool {
true
}
}
impl Drop for Controller {
fn drop(&mut self) { fn drop(&mut self) {
for h in self.daemons.lock().unwrap().drain(..) { for h in self.daemons.lock().unwrap().drain(..) {
h.abort(); h.abort();

View file

@ -1,6 +1,9 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
mod controller;
pub mod database; pub mod database;
pub mod filedatabase; pub mod filedatabase;
pub mod handler;
pub mod model; pub mod model;
pub use controller::*;

View file

@ -8,7 +8,7 @@ use clap::{Arg, Command};
use zerotier_network_controller::database::Database; use zerotier_network_controller::database::Database;
use zerotier_network_controller::filedatabase::FileDatabase; use zerotier_network_controller::filedatabase::FileDatabase;
use zerotier_network_controller::handler::Handler; use zerotier_network_controller::Controller;
use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
use zerotier_utils::exitcode; use zerotier_utils::exitcode;
@ -16,7 +16,7 @@ use zerotier_utils::tokio::runtime::Runtime;
use zerotier_vl1_service::VL1Service; use zerotier_vl1_service::VL1Service;
async fn run(database: Arc<dyn Database>, runtime: &Runtime) -> i32 { async fn run(database: Arc<dyn Database>, runtime: &Runtime) -> i32 {
let handler = Handler::new(database.clone(), runtime.handle().clone()).await; let handler = Controller::new(database.clone(), runtime.handle().clone()).await;
if handler.is_err() { if handler.is_err() {
eprintln!("FATAL: error initializing handler: {}", handler.err().unwrap().to_string()); eprintln!("FATAL: error initializing handler: {}", handler.err().unwrap().to_string());
exitcode::ERR_CONFIG exitcode::ERR_CONFIG

View file

@ -1,6 +1,6 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
#[allow(unused_imports)] #[allow(unused_imports)]
use std::mem::{size_of, transmute, MaybeUninit}; use std::mem::{size_of, transmute, MaybeUninit};
#[allow(unused_imports)] #[allow(unused_imports)]
@ -174,10 +174,10 @@ impl BoundUdpPort {
/// The caller can check the 'sockets' member variable after calling to determine which if any bindings were /// The caller can check the 'sockets' member variable after calling to determine which if any bindings were
/// successful. Any errors that occurred are returned as tuples of (interface, address, error). The second vector /// successful. Any errors that occurred are returned as tuples of (interface, address, error). The second vector
/// returned contains newly bound sockets. /// returned contains newly bound sockets.
pub fn update_bindings<UdpPacketHandlerImpl: UdpPacketHandler>( pub fn update_bindings<UdpPacketHandlerImpl: UdpPacketHandler + ?Sized>(
&mut self, &mut self,
interface_prefix_blacklist: &Vec<String>, interface_prefix_blacklist: &HashSet<String>,
cidr_blacklist: &Vec<InetAddress>, cidr_blacklist: &HashSet<InetAddress>,
buffer_pool: &Arc<PacketBufferPool>, buffer_pool: &Arc<PacketBufferPool>,
handler: &Arc<UdpPacketHandlerImpl>, handler: &Arc<UdpPacketHandlerImpl>,
) -> Vec<(LocalInterface, InetAddress, std::io::Error)> { ) -> Vec<(LocalInterface, InetAddress, std::io::Error)> {
@ -200,7 +200,7 @@ impl BoundUdpPort {
address.scope(), address.scope(),
IpScope::Global | IpScope::PseudoPrivate | IpScope::Private | IpScope::Shared IpScope::Global | IpScope::PseudoPrivate | IpScope::Private | IpScope::Shared
) )
&& !interface_prefix_blacklist.iter().any(|pfx| interface_str.starts_with(pfx.as_str())) && !interface_prefix_blacklist.iter().any(|pfx| interface_str.starts_with(pfx))
&& !cidr_blacklist.iter().any(|r| address.is_within(r)) && !cidr_blacklist.iter().any(|r| address.is_within(r))
&& !ipv6::is_ipv6_temporary(interface_str.as_str(), address) && !ipv6::is_ipv6_temporary(interface_str.as_str(), address)
{ {

View file

@ -1,5 +1,7 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::HashSet;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::InetAddress; use zerotier_network_hypervisor::vl1::InetAddress;
@ -8,7 +10,7 @@ use zerotier_network_hypervisor::vl1::InetAddress;
#[serde(default)] #[serde(default)]
pub struct VL1Settings { pub struct VL1Settings {
/// Primary ZeroTier port that is always bound, default is 9993. /// Primary ZeroTier port that is always bound, default is 9993.
pub fixed_ports: Vec<u16>, pub fixed_ports: HashSet<u16>,
/// Number of additional random ports to bind. /// Number of additional random ports to bind.
pub random_port_count: usize, pub random_port_count: usize,
@ -17,10 +19,10 @@ pub struct VL1Settings {
pub port_mapping: bool, pub port_mapping: bool,
/// Interface name prefix blacklist for local bindings (not remote IPs). /// Interface name prefix blacklist for local bindings (not remote IPs).
pub interface_prefix_blacklist: Vec<String>, pub interface_prefix_blacklist: HashSet<String>,
/// IP/bits CIDR blacklist for local bindings (not remote IPs). /// IP/bits CIDR blacklist for local bindings (not remote IPs).
pub cidr_blacklist: Vec<InetAddress>, pub cidr_blacklist: HashSet<InetAddress>,
} }
impl VL1Settings { impl VL1Settings {
@ -38,11 +40,11 @@ impl VL1Settings {
impl Default for VL1Settings { impl Default for VL1Settings {
fn default() -> Self { fn default() -> Self {
Self { Self {
fixed_ports: vec![9993], fixed_ports: HashSet::from([9993u16]),
random_port_count: 5, random_port_count: 5,
port_mapping: true, port_mapping: true,
interface_prefix_blacklist: Self::DEFAULT_PREFIX_BLACKLIST.iter().map(|s| s.to_string()).collect(), interface_prefix_blacklist: Self::DEFAULT_PREFIX_BLACKLIST.iter().map(|s| s.to_string()).collect(),
cidr_blacklist: Vec::new(), cidr_blacklist: HashSet::new(),
} }
} }
} }