Moar controller V2

This commit is contained in:
Adam Ierymenko 2022-10-20 07:48:49 -07:00
parent f2028ce3a2
commit 39aa46ebf5
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
11 changed files with 339 additions and 146 deletions

View file

@ -1,5 +1,3 @@
use std::error::Error;
use async_trait::async_trait;
use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorage};
@ -21,12 +19,14 @@ pub enum Change {
#[async_trait]
pub trait Database: Sync + Send + NodeStorage + 'static {
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error>>;
async fn save_network(&self, obj: Network) -> Result<(), Box<dyn Error>>;
type Error: std::error::Error + Send + 'static;
async fn list_members(&self, network_id: NetworkId) -> Result<Vec<Address>, Box<dyn Error>>;
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Box<dyn Error>>;
async fn save_member(&self, obj: Member) -> Result<(), Box<dyn Error>>;
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Self::Error>;
async fn save_network(&self, obj: Network) -> Result<(), Self::Error>;
async fn list_members(&self, network_id: NetworkId) -> Result<Vec<Address>, Self::Error>;
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Self::Error>;
async fn save_member(&self, obj: Member) -> Result<(), Self::Error>;
/// Get a receiver that can be used to receive changes made to networks and members, if supported.
///
@ -47,7 +47,7 @@ pub trait Database: Sync + Send + NodeStorage + 'static {
///
/// The default trait implementation uses a brute force method. This should be reimplemented if a
/// more efficient way is available.
async fn list_members_deauthorized_after(&self, network_id: NetworkId, cutoff: i64) -> Result<Vec<Address>, Box<dyn Error>> {
async fn list_members_deauthorized_after(&self, network_id: NetworkId, cutoff: i64) -> Result<Vec<Address>, Self::Error> {
let mut v = Vec::new();
let members = self.list_members(network_id).await?;
for a in members.iter() {
@ -64,7 +64,7 @@ pub trait Database: Sync + Send + NodeStorage + 'static {
///
/// The default trait implementation uses a brute force method. This should be reimplemented if a
/// more efficient way is available.
async fn is_ip_assigned(&self, network_id: NetworkId, ip: &InetAddress) -> Result<bool, Box<dyn Error>> {
async fn is_ip_assigned(&self, network_id: NetworkId, ip: &InetAddress) -> Result<bool, Self::Error> {
let members = self.list_members(network_id).await?;
for a in members.iter() {
if let Some(m) = self.get_member(network_id, *a).await? {
@ -76,5 +76,5 @@ pub trait Database: Sync + Send + NodeStorage + 'static {
return Ok(false);
}
async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Box<dyn Error>>;
async fn log_request(&self, obj: RequestLogItem) -> Result<(), Self::Error>;
}

View file

@ -1,4 +1,5 @@
use std::error::Error;
use std::fmt::Display;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
@ -16,6 +17,35 @@ use crate::model::*;
const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret";
#[derive(Debug)]
pub enum FileDatabaseError {
InvalidYaml(String),
IoError(String),
}
impl Display for FileDatabaseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidYaml(e) => f.write_str(format!("invalid YAML ({})", e).as_str()),
Self::IoError(e) => f.write_str(format!("I/O error ({})", e).as_str()),
}
}
}
impl Error for FileDatabaseError {}
impl From<serde_yaml::Error> for FileDatabaseError {
fn from(e: serde_yaml::Error) -> Self {
Self::InvalidYaml(e.to_string())
}
}
impl From<zerotier_utils::tokio::io::Error> for FileDatabaseError {
fn from(e: zerotier_utils::tokio::io::Error) -> Self {
Self::IoError(e.to_string())
}
}
/// An in-filesystem database that permits live editing.
///
/// A cache is maintained that contains the actual objects. When an object is live edited,
@ -84,7 +114,9 @@ impl NodeStorage for FileDatabase {
#[async_trait]
impl Database for FileDatabase {
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error>> {
type Error = FileDatabaseError;
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Self::Error> {
let r = fs::read(self.network_path(id)).await;
if let Ok(raw) = r {
let mut network = serde_yaml::from_slice::<Network>(raw.as_slice())?;
@ -97,7 +129,7 @@ impl Database for FileDatabase {
}
}
async fn save_network(&self, obj: Network) -> Result<(), Box<dyn Error>> {
async fn save_network(&self, obj: Network) -> Result<(), Self::Error> {
let base_network_path = self.network_path(obj.id);
let _ = fs::create_dir_all(base_network_path.parent().unwrap()).await;
//let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?;
@ -105,7 +137,7 @@ impl Database for FileDatabase {
return Ok(());
}
async fn list_members(&self, network_id: NetworkId) -> Result<Vec<Address>, Box<dyn Error>> {
async fn list_members(&self, network_id: NetworkId) -> Result<Vec<Address>, Self::Error> {
let mut members = Vec::new();
let mut dir = fs::read_dir(self.base_path.join(network_id.to_string())).await?;
while let Ok(Some(ent)) = dir.next_entry().await {
@ -125,7 +157,7 @@ impl Database for FileDatabase {
Ok(members)
}
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Box<dyn Error>> {
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Self::Error> {
let r = fs::read(self.member_path(network_id, node_id)).await;
if let Ok(raw) = r {
let mut member = serde_yaml::from_slice::<Member>(raw.as_slice())?;
@ -138,7 +170,7 @@ impl Database for FileDatabase {
}
}
async fn save_member(&self, obj: Member) -> Result<(), Box<dyn Error>> {
async fn save_member(&self, obj: Member) -> Result<(), Self::Error> {
let base_member_path = self.member_path(obj.network_id, obj.node_id);
let _ = fs::create_dir_all(base_member_path.parent().unwrap()).await;
//let _ = fs::write(base_member_path, to_json_pretty(&obj).as_bytes()).await?;
@ -146,7 +178,7 @@ impl Database for FileDatabase {
Ok(())
}
async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Box<dyn Error>> {
async fn log_request(&self, obj: RequestLogItem) -> Result<(), Self::Error> {
println!("{}", obj.to_string());
Ok(())
}

View file

@ -1,22 +1,26 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::any::Any;
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock, Weak};
use tokio::time::{Duration, Instant};
use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU};
use zerotier_network_hypervisor::protocol;
use zerotier_network_hypervisor::protocol::{PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU};
use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, Node, PacketHandlerResult, Path, PathFilter, Peer};
use zerotier_network_hypervisor::vl2::{CertificateOfMembership, CertificateOfOwnership, NetworkConfig, NetworkId, Tag};
use zerotier_utils::blob::Blob;
use zerotier_utils::buffer::OutOfBoundsError;
use zerotier_utils::dictionary::Dictionary;
use zerotier_utils::error::{InvalidParameterError, UnexpectedError};
use zerotier_utils::ms_since_epoch;
use zerotier_utils::error::InvalidParameterError;
use zerotier_utils::reaper::Reaper;
use zerotier_utils::tokio;
use zerotier_utils::{ms_monotonic, ms_since_epoch};
use zerotier_vl1_service::VL1Service;
use crate::database::*;
use crate::model::{AuthorizationResult, Member, CREDENTIAL_WINDOW_SIZE_DEFAULT};
use crate::model::{AuthorizationResult, Member, RequestLogItem, CREDENTIAL_WINDOW_SIZE_DEFAULT};
// A netconf per-query task timeout, just a sanity limit.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
@ -27,6 +31,7 @@ pub struct Handler<DatabaseImpl: Database> {
}
struct Inner<DatabaseImpl: Database> {
service: RwLock<Weak<VL1Service<DatabaseImpl, Handler<DatabaseImpl>, Handler<DatabaseImpl>>>>,
reaper: Reaper,
daemons: Mutex<Vec<tokio::task::JoinHandle<()>>>, // drop() aborts these
runtime: tokio::runtime::Handle,
@ -41,6 +46,7 @@ impl<DatabaseImpl: Database> Handler<DatabaseImpl> {
assert!(local_identity.secret.is_some());
let inner = Arc::new(Inner::<DatabaseImpl> {
service: RwLock::new(Weak::default()),
reaper: Reaper::new(&runtime),
daemons: Mutex::new(Vec::with_capacity(1)),
runtime,
@ -58,19 +64,29 @@ impl<DatabaseImpl: Database> Handler<DatabaseImpl> {
}
}
/// Set the service and HostSystem implementation for this controller.
///
/// This must be called once the service that uses this handler is up or the controller
/// won't actually do anything. The reference the handler holds is weak to prevent
/// a circular reference, so if the VL1Service is dropped this must be called again to
/// tell the controller handler about a new instance.
pub fn set_service(&self, service: &Arc<VL1Service<DatabaseImpl, Self, Self>>) {
*self.inner.service.write().unwrap() = Arc::downgrade(service);
}
/// Start a change watcher to respond to changes detected by the database.
/// This should only be called once, though multiple calls won't do anything but create unnecessary async tasks.
pub async fn start_change_watcher(&self, service: &Arc<VL1Service<DatabaseImpl, Self, Self>>) {
///
/// This should only be called once, though multiple calls won't do anything but create
/// unnecessary async tasks. If the database being used does not support changes, this
/// does nothing.
pub async fn start_change_watcher(&self) {
if let Some(cw) = self.inner.database.changes().await.map(|mut ch| {
let inner = self.inner.clone();
let service = service.clone();
self.inner.runtime.spawn(async move {
loop {
if let Ok(change) = ch.recv().await {
inner.reaper.add(
inner
.runtime
.spawn(inner.clone().handle_change_notification(service.clone(), change)),
inner.runtime.spawn(inner.clone().handle_change_notification(change)),
Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(),
);
}
@ -110,15 +126,17 @@ impl<DatabaseImpl: Database> PathFilter for Handler<DatabaseImpl> {
impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
verb: u8,
payload: &PacketBuffer,
) -> PacketHandlerResult {
match verb {
verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => {
protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => {
let mut cursor = 1;
let network_id = payload.read_u64(&mut cursor);
@ -149,6 +167,7 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
Dictionary::new()
};
/*
let (have_revision, have_timestamp) = if (cursor + 16) <= payload.len() {
let r = payload.read_u64(&mut cursor);
let t = payload.read_u64(&mut cursor);
@ -159,21 +178,49 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
} else {
(None, None)
};
*/
// Launch handler as an async background task.
let (inner, source2, source_path2) = (self.inner.clone(), source.clone(), source_path.clone());
let (inner, peer, source_remote_endpoint) = (self.inner.clone(), source.clone(), source_path.endpoint.clone());
self.inner.reaper.add(
self.inner.runtime.spawn(async move {
// TODO: log errors
let result = inner.handle_network_config_request(
source2,
source_path2,
message_id,
network_id,
meta_data,
have_revision,
have_timestamp,
);
let node_id = peer.identity.address;
let node_fingerprint = Blob::from(peer.identity.fingerprint);
let now = ms_since_epoch();
let result = match inner
.handle_network_config_request::<HostSystemImpl>(&peer.identity, network_id, &meta_data, now)
.await
{
Result::Ok((result, Some(config))) => {
inner.send_network_config(peer, &config, Some(message_id));
result
}
Result::Ok((result, None)) => result,
Result::Err(_) => {
// TODO: log invalid request or internal error
return;
}
};
let _ = inner
.database
.log_request(RequestLogItem {
network_id,
node_id,
node_fingerprint,
controller_node_id: inner.local_identity.address,
metadata: if meta_data.is_empty() {
Vec::new()
} else {
meta_data.to_bytes()
},
timestamp: now,
source_remote_endpoint,
source_hops,
result,
})
.await;
}),
Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(),
);
@ -186,9 +233,11 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,
@ -201,9 +250,11 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,
@ -219,24 +270,73 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
}
impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
async fn handle_change_notification(
self: Arc<Self>,
service: Arc<VL1Service<DatabaseImpl, Handler<DatabaseImpl>, Handler<DatabaseImpl>>>,
_change: Change,
fn send_network_config(
&self,
peer: Arc<dyn Any>, // hack can go away when Rust has specialization
config: &NetworkConfig,
in_re_message_id: Option<u64>, // None for unsolicited push
) {
if let Some(host_system) = self.service.read().unwrap().upgrade() {
if let Some(peer) = peer.downcast_ref::<Peer<VL1Service<DatabaseImpl, Handler<DatabaseImpl>, Handler<DatabaseImpl>>>>() {
peer.send(
host_system.as_ref(),
host_system.node(),
None,
ms_monotonic(),
|packet| -> Result<(), OutOfBoundsError> {
if let Some(in_re_message_id) = in_re_message_id {
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
ok_header.verb = protocol::verbs::VL1_OK;
ok_header.in_re_verb = protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST;
ok_header.in_re_message_id = in_re_message_id.to_be_bytes();
} else {
packet.append_u8(protocol::verbs::VL2_VERB_NETWORK_CONFIG)?;
}
if peer.is_v2() {
todo!()
} else {
let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) {
config_dict.to_bytes()
} else {
eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary");
return Err(OutOfBoundsError); // abort
};
if config_data.len() > (u16::MAX as usize) {
eprintln!("WARNING: network config is larger than 65536 bytes!");
return Err(OutOfBoundsError); // abort
}
packet.append_u64(config.network_id.into())?;
packet.append_u16(config_data.len() as u16)?;
packet.append_bytes(config_data.as_slice())?;
// TODO: compress
// NOTE: V1 supports a bunch of other things like chunking but it was never truly used and is optional.
// Omit it here as it adds overhead.
}
Ok(())
},
);
} else {
panic!("HostSystem implementation mismatch with service to which controller is harnessed");
}
}
}
async fn handle_change_notification(self: Arc<Self>, _change: Change) {
todo!()
}
async fn handle_network_config_request<HostSystemImpl: HostSystem + ?Sized>(
self: Arc<Self>,
source: Arc<Peer<HostSystemImpl>>,
_source_path: Arc<Path<HostSystemImpl>>,
_message_id: u64,
self: &Arc<Self>,
source_identity: &Identity,
network_id: NetworkId,
_meta_data: Dictionary,
_have_revision: Option<u64>,
_have_timestamp: Option<u64>,
) -> Result<(AuthorizationResult, Option<NetworkConfig>), Box<dyn Error>> {
_meta_data: &Dictionary,
now: i64,
) -> Result<(AuthorizationResult, Option<NetworkConfig>), DatabaseImpl::Error> {
let network = self.database.get_network(network_id).await?;
if network.is_none() {
// TODO: send error
@ -244,27 +344,25 @@ impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
}
let network = network.unwrap();
let mut member = self.database.get_member(network_id, source.identity.address).await?;
let mut member = self.database.get_member(network_id, source_identity.address).await?;
let mut member_changed = false;
let legacy_v1 = source.identity.p384.is_none();
let legacy_v1 = source_identity.p384.is_none();
// If we have a member object and a pinned identity, check to make sure it matches.
if let Some(member) = member.as_ref() {
if let Some(pinned_identity) = member.identity.as_ref() {
if !pinned_identity.eq(&source.identity) {
if !pinned_identity.eq(&source_identity) {
return Ok((AuthorizationResult::RejectedIdentityMismatch, None));
}
}
}
let now = ms_since_epoch();
let mut authorization_result = AuthorizationResult::Rejected;
let mut authorized = member.as_ref().map_or(false, |m| m.authorized());
if !authorized {
if member.is_none() {
if network.learn_members.unwrap_or(true) {
let _ = member.insert(Member::new_with_identity(source.identity.clone(), network_id));
let _ = member.insert(Member::new_with_identity(source_identity.clone(), network_id));
member_changed = true;
} else {
return Ok((AuthorizationResult::Rejected, None));
@ -296,9 +394,9 @@ impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
let deauthed_members_still_in_window = self.database.list_members_deauthorized_after(network.id, now - max_delta).await;
// Check and if necessary auto-assign static IPs for this member.
member_changed |= network.check_zt_ip_assignments(self.database.as_ref(), &mut member).await?;
member_changed |= network.check_zt_ip_assignments(self.database.as_ref(), &mut member).await;
let mut nc = NetworkConfig::new(network_id, source.identity.address);
let mut nc = NetworkConfig::new(network_id, source_identity.address);
nc.name = member.name.clone();
nc.private = network.private;
@ -312,25 +410,27 @@ impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
nc.rules = network.rules;
nc.dns = network.dns;
nc.certificate_of_membership = Some(
CertificateOfMembership::new(&self.local_identity, network_id, &source.identity, now, max_delta, legacy_v1)
.ok_or(UnexpectedError)?,
);
nc.certificate_of_membership =
CertificateOfMembership::new(&self.local_identity, network_id, &source_identity, now, max_delta, legacy_v1);
if nc.certificate_of_membership.is_none() {
return Ok((AuthorizationResult::RejectedDueToError, None));
}
let mut coo = CertificateOfOwnership::new(network_id, now, source.identity.address, legacy_v1);
let mut coo = CertificateOfOwnership::new(network_id, now, source_identity.address, legacy_v1);
for ip in nc.static_ips.iter() {
coo.add_ip(ip);
}
if !coo.sign(&self.local_identity, &source.identity) {
return Err(Box::new(UnexpectedError));
if !coo.sign(&self.local_identity, &source_identity) {
return Ok((AuthorizationResult::RejectedDueToError, None));
}
nc.certificates_of_ownership.push(coo);
for (id, value) in member.tags.iter() {
let _ = nc.tags.insert(
*id,
Tag::new(*id, *value, &self.local_identity, network_id, &source.identity, now, legacy_v1).ok_or(UnexpectedError)?,
);
let tag = Tag::new(*id, *value, &self.local_identity, network_id, &source_identity, now, legacy_v1);
if tag.is_none() {
return Ok((AuthorizationResult::RejectedDueToError, None));
}
let _ = nc.tags.insert(*id, tag.unwrap());
}
// TODO: node info, which isn't supported in v1 so not needed yet

View file

@ -33,7 +33,9 @@ async fn run<DatabaseImpl: Database>(database: Arc<DatabaseImpl>, runtime: &Runt
if svc.is_ok() {
let svc = svc.unwrap();
svc.node().init_default_roots();
handler.start_change_watcher(&svc).await;
handler.set_service(&svc);
handler.start_change_watcher().await;
// Wait for kill signal on Unix-like platforms.
#[cfg(unix)]

View file

@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::{Address, Endpoint};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::blob::Blob;
/// A complete network with all member configuration information for import/export or blob storage.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -94,14 +95,16 @@ pub struct RequestLogItem {
pub network_id: NetworkId,
#[serde(rename = "nid")]
pub node_id: Address,
#[serde(rename = "nf")]
pub node_fingerprint: Blob<48>,
#[serde(rename = "cid")]
pub controller_node_id: Address,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)]
#[serde(rename = "md")]
pub metadata: Vec<u8>,
#[serde(rename = "ts")]
pub timestamp: i64,
#[serde(rename = "v")]
pub version: (u16, u16, u16, u16),
#[serde(rename = "s")]
pub source_remote_endpoint: Endpoint,
#[serde(rename = "sh")]
@ -113,15 +116,11 @@ pub struct RequestLogItem {
impl ToString for RequestLogItem {
fn to_string(&self) -> String {
format!(
"{} {} {} ts={} v={}.{}.{},{} s={},{} {}",
"{} {} {} ts={} s={},{} {}",
self.controller_node_id.to_string(),
self.network_id.to_string(),
self.node_id.to_string(),
self.timestamp,
self.version.0,
self.version.1,
self.version.2,
self.version.3,
self.source_remote_endpoint.to_string(),
self.source_hops,
self.result.to_string()

View file

@ -142,11 +142,7 @@ fn troo() -> bool {
impl Network {
/// Check member IP assignments and return 'true' if IP assignments were created or modified.
pub async fn check_zt_ip_assignments<DatabaseImpl: Database>(
&self,
database: &DatabaseImpl,
member: &mut Member,
) -> Result<bool, Box<dyn Error>> {
pub async fn check_zt_ip_assignments<DatabaseImpl: Database>(&self, database: &DatabaseImpl, member: &mut Member) -> bool {
let mut modified = false;
if self.v4_assign_mode.zt {
@ -159,10 +155,14 @@ impl Network {
for route in self.ip_routes.iter() {
let ip = InetAddress::from_ip_port(&ip_ptr.to_be_bytes(), route.target.port()); // IP/bits
if ip.is_within(&route.target) {
if !database.is_ip_assigned(self.id, &ip).await? {
modified = true;
let _ = member.ip_assignments.insert(ip);
break 'ip_search;
if let Ok(is_ip_assigned) = database.is_ip_assigned(self.id, &ip).await {
if !is_ip_assigned {
modified = true;
let _ = member.ip_assignments.insert(ip);
break 'ip_search;
}
} else {
return false;
}
}
}
@ -183,10 +183,14 @@ impl Network {
for route in self.ip_routes.iter() {
let ip = InetAddress::from_ip_port(&ip_ptr.to_be_bytes(), route.target.port()); // IP/bits
if ip.is_within(&route.target) {
if !database.is_ip_assigned(self.id, &ip).await? {
modified = true;
let _ = member.ip_assignments.insert(ip);
break 'ip_search;
if let Ok(is_ip_assigned) = database.is_ip_assigned(self.id, &ip).await {
if !is_ip_assigned {
modified = true;
let _ = member.ip_assignments.insert(ip);
break 'ip_search;
}
} else {
return false;
}
}
}
@ -197,6 +201,6 @@ impl Network {
}
}
Ok(modified)
return modified;
}
}

View file

@ -129,7 +129,7 @@ pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff;
/// Bit mask for address bits in a u64.
pub const ADDRESS_MASK: u64 = 0xffffffffff;
pub(crate) mod v1 {
pub mod v1 {
use super::*;
/// Size of packet header that lies outside the encryption envelope.
@ -266,7 +266,7 @@ pub(crate) mod v1 {
///
/// This will panic if the buffer provided doesn't contain a proper header.
#[inline(always)]
pub fn set_packet_fragment_flag<const S: usize>(pkt: &mut Buffer<S>) {
pub(crate) fn set_packet_fragment_flag<const S: usize>(pkt: &mut Buffer<S>) {
pkt.as_bytes_mut()[FLAGS_FIELD_INDEX] |= HEADER_FLAG_FRAGMENTED;
}
@ -393,27 +393,10 @@ pub(crate) mod v1 {
}
/// Flat packed structs for fixed length header blocks in messages.
pub(crate) mod message_component_structs {
pub mod message_component_structs {
#[derive(Clone, Copy)]
#[repr(C, packed)]
pub struct OkHeader {
pub verb: u8,
pub in_re_verb: u8,
pub in_re_message_id: [u8; 8],
}
#[derive(Clone, Copy)]
#[repr(C, packed)]
pub struct ErrorHeader {
pub verb: u8,
pub in_re_verb: u8,
pub in_re_message_id: [u8; 8],
pub error_code: u8,
}
#[derive(Clone, Copy)]
#[repr(C, packed)]
pub struct HelloFixedHeaderFields {
pub(crate) struct HelloFixedHeaderFields {
pub verb: u8,
pub version_proto: u8,
pub version_major: u8,
@ -424,7 +407,7 @@ pub(crate) mod v1 {
#[derive(Clone, Copy)]
#[repr(C, packed)]
pub struct OkHelloFixedHeaderFields {
pub(crate) struct OkHelloFixedHeaderFields {
pub timestamp_echo: [u8; 8], // u64
pub version_proto: u8,
pub version_major: u8,
@ -532,6 +515,23 @@ pub(crate) mod v1 {
}
}
#[derive(Clone, Copy)]
#[repr(C, packed)]
pub struct OkHeader {
pub verb: u8,
pub in_re_verb: u8,
pub in_re_message_id: [u8; 8],
}
#[derive(Clone, Copy)]
#[repr(C, packed)]
pub struct ErrorHeader {
pub verb: u8,
pub in_re_verb: u8,
pub in_re_message_id: [u8; 8],
pub error_code: u8,
}
/// Maximum delta between the message ID of a sent packet and its response.
pub(crate) const PACKET_RESPONSE_COUNTER_DELTA_MAX: u64 = 256;
@ -573,8 +573,8 @@ mod tests {
#[test]
fn representation() {
assert_eq!(size_of::<v1::message_component_structs::OkHeader>(), 10);
assert_eq!(size_of::<v1::message_component_structs::ErrorHeader>(), 11);
assert_eq!(size_of::<OkHeader>(), 10);
assert_eq!(size_of::<ErrorHeader>(), 11);
assert_eq!(size_of::<v1::PacketHeader>(), v1::HEADER_SIZE);
assert_eq!(size_of::<v1::FragmentHeader>(), v1::FRAGMENT_HEADER_SIZE);

View file

@ -135,9 +135,11 @@ pub trait InnerProtocol: Sync + Send {
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
verb: u8,
payload: &PacketBuffer,
@ -146,9 +148,11 @@ pub trait InnerProtocol: Sync + Send {
/// Handle errors, returning true if the error was recognized.
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,
@ -160,9 +164,11 @@ pub trait InnerProtocol: Sync + Send {
/// Handle an OK, returing true if the OK was recognized.
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,
@ -1049,9 +1055,11 @@ impl InnerProtocol for DummyInnerProtocol {
#[inline(always)]
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_source_hops: u8,
_message_id: u64,
_verb: u8,
_payload: &PacketBuffer,
@ -1062,9 +1070,11 @@ impl InnerProtocol for DummyInnerProtocol {
#[inline(always)]
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,
@ -1078,9 +1088,11 @@ impl InnerProtocol for DummyInnerProtocol {
#[inline(always)]
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node<HostSystemImpl>,
_source: &Arc<Peer<HostSystemImpl>>,
_source_path: &Arc<Path<HostSystemImpl>>,
_source_hops: u8,
_message_id: u64,
_in_re_verb: u8,
_in_re_message_id: u64,

View file

@ -83,6 +83,12 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
})
}
/// Returns true if this peer supports the ZeroTier V2 protocol stack and features.
#[inline(always)]
pub fn is_v2(&self) -> bool {
self.identity.p384.is_some()
}
/// Get the remote version of this peer: major, minor, revision.
/// Returns None if it's not yet known.
pub fn version(&self) -> Option<(u8, u8, u16)> {
@ -105,6 +111,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
}
/// Get current best path or None if there are no direct paths to this peer.
#[inline]
pub fn direct_path(&self) -> Option<Arc<Path<HostSystemImpl>>> {
for p in self.paths.lock().unwrap().iter() {
let pp = p.path.upgrade();
@ -116,6 +123,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
}
/// Get either the current best direct path or an indirect path via e.g. a root.
#[inline]
pub fn path(&self, node: &Node<HostSystemImpl>) -> Option<Arc<Path<HostSystemImpl>>> {
let direct_path = self.direct_path();
if direct_path.is_some() {
@ -184,6 +192,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
prioritize_paths(&mut paths);
}
/// Get the next sequential message ID for use with the V1 transport protocol.
#[inline(always)]
pub(crate) fn v1_proto_next_message_id(&self) -> MessageId {
self.message_id_counter.fetch_add(1, Ordering::SeqCst)
@ -292,7 +301,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
let max_fragment_size = path.endpoint.max_fragment_size();
let mut packet = host_system.get_buffer();
if !self.identity.p384.is_some() {
if !self.is_v2() {
// For the V1 protocol, leave room for for the header in the buffer.
packet.set_size(v1::HEADER_SIZE);
}
@ -300,7 +309,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
let r = builder_function(packet.as_mut());
if r.is_ok() {
if self.identity.p384.is_some() {
if self.is_v2() {
todo!() // TODO: ZSSP / V2 protocol
} else {
if self.remote_node_info.read().unwrap().remote_protocol_version >= 11 {
@ -422,7 +431,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
}
debug_assert_eq!(packet.len(), 41);
assert!(node.identity.write_public(packet.as_mut(), self.identity.p384.is_none()).is_ok());
assert!(node.identity.write_public(packet.as_mut(), !self.is_v2()).is_ok());
let (_, poly1305_key) = v1_proto_salsa_poly_create(
&self.v1_proto_static_secret,
@ -536,25 +545,25 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
return match verb {
verbs::VL1_NOP => PacketHandlerResult::Ok,
verbs::VL1_HELLO => self.handle_incoming_hello(
verbs::VL1_HELLO => self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload),
verbs::VL1_ERROR => self.handle_incoming_error(
host_system,
inner,
node,
time_ticks,
message_id,
source_path,
packet_header.hops(),
message_id,
&payload,
),
verbs::VL1_ERROR => self.handle_incoming_error(host_system, inner, node, time_ticks, source_path, message_id, &payload),
verbs::VL1_OK => self.handle_incoming_ok(
host_system,
inner,
node,
time_ticks,
source_path,
message_id,
packet_header.hops(),
message_id,
path_is_known,
&payload,
),
@ -567,7 +576,16 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload)
}
verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload),
_ => inner.handle_packet(node, self, &source_path, message_id, verb, &payload),
_ => inner.handle_packet(
host_system,
node,
self,
&source_path,
packet_header.hops(),
message_id,
verb,
&payload,
),
};
}
}
@ -583,7 +601,6 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
time_ticks: i64,
message_id: MessageId,
source_path: &Arc<Path<HostSystemImpl>>,
_hops: u8,
payload: &PacketBuffer,
) -> PacketHandlerResult {
if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
@ -615,10 +632,8 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
Some(source_path),
time_ticks,
|packet| -> Result<(), Infallible> {
let f: &mut (
v1::message_component_structs::OkHeader,
v1::message_component_structs::OkHelloFixedHeaderFields,
) = packet.append_struct_get_mut().unwrap();
let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) =
packet.append_struct_get_mut().unwrap();
f.0.verb = verbs::VL1_OK;
f.0.in_re_verb = verbs::VL1_HELLO;
f.0.in_re_message_id = message_id.to_ne_bytes();
@ -641,24 +656,27 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
fn handle_incoming_error<InnerProtocolImpl: InnerProtocol + ?Sized>(
self: &Arc<Self>,
_: &HostSystemImpl,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
node: &Node<HostSystemImpl>,
_: i64,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
payload: &PacketBuffer,
) -> PacketHandlerResult {
let mut cursor = 0;
if let Ok(error_header) = payload.read_struct::<v1::message_component_structs::ErrorHeader>(&mut cursor) {
if let Ok(error_header) = payload.read_struct::<ErrorHeader>(&mut cursor) {
let in_re_message_id: MessageId = u64::from_be_bytes(error_header.in_re_message_id);
if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX {
match error_header.in_re_verb {
_ => {
return inner.handle_error(
host_system,
node,
self,
&source_path,
source_hops,
message_id,
error_header.in_re_verb,
in_re_message_id,
@ -680,13 +698,13 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
node: &Node<HostSystemImpl>,
time_ticks: i64,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
hops: u8,
path_is_known: bool,
payload: &PacketBuffer,
) -> PacketHandlerResult {
let mut cursor = 0;
if let Ok(ok_header) = payload.read_struct::<v1::message_component_structs::OkHeader>(&mut cursor) {
if let Ok(ok_header) = payload.read_struct::<OkHeader>(&mut cursor) {
let in_re_message_id: MessageId = u64::from_ne_bytes(ok_header.in_re_message_id);
if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX {
match ok_header.in_re_verb {
@ -694,7 +712,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
if let Ok(_ok_hello_fixed_header_fields) =
payload.read_struct::<v1::message_component_structs::OkHelloFixedHeaderFields>(&mut cursor)
{
if hops == 0 {
if source_hops == 0 {
debug_event!(host_system, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),);
if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) {
#[cfg(debug_assertions)]
@ -718,7 +736,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
}
}
if hops == 0 && !path_is_known {
if source_hops == 0 && !path_is_known {
self.learn_path(host_system, source_path, time_ticks);
}
@ -755,9 +773,11 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
_ => {
return inner.handle_ok(
host_system,
node,
self,
&source_path,
source_hops,
message_id,
ok_header.in_re_verb,
in_re_message_id,
@ -788,7 +808,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
while addresses.len() >= ADDRESS_SIZE && (packet.len() + Identity::MAX_MARSHAL_SIZE) <= UDP_DEFAULT_MTU {
if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) {
if let Some(peer) = node.peer(zt_address) {
peer.identity.write_public(packet, self.identity.p384.is_none())?;
peer.identity.write_public(packet, !self.is_v2())?;
}
}
addresses = &addresses[ADDRESS_SIZE..];
@ -828,7 +848,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
) -> PacketHandlerResult {
if inner.should_communicate_with(&self.identity) || node.is_peer_root(self) {
self.send(host_system, node, None, time_ticks, |packet| {
let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap();
let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap();
f.verb = verbs::VL1_OK;
f.in_re_verb = verbs::VL1_ECHO;
f.in_re_message_id = message_id.to_ne_bytes();

View file

@ -410,13 +410,15 @@ pub struct SSOAuthConfiguration {
/// Information about nodes on the network that can be included in a network config.
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeInfo {
pub flags: u64,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub flags: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub ip: Option<InetAddress>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "String::is_empty")]
#[serde(default)]
pub name: Option<String>,
pub name: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default)]
pub services: HashMap<String, Option<String>>,
@ -429,8 +431,12 @@ pub struct IpRoute {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub via: Option<InetAddress>,
pub flags: u16,
pub metric: u16,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub flags: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub metric: Option<u16>,
}
impl Marshalable for IpRoute {
@ -446,8 +452,8 @@ impl Marshalable for IpRoute {
} else {
buf.append_u8(0)?; // "nil" InetAddress
}
buf.append_u16(self.flags)?;
buf.append_u16(self.metric)?;
buf.append_u16(self.flags.unwrap_or(0))?;
buf.append_u16(self.metric.unwrap_or(0))?;
Ok(())
}
@ -465,8 +471,20 @@ impl Marshalable for IpRoute {
Some(via)
}
},
flags: buf.read_u16(cursor)?,
metric: buf.read_u16(cursor)?,
flags: buf.read_u16(cursor).map(|f| {
if f == 0 {
None
} else {
Some(f)
}
})?,
metric: buf.read_u16(cursor).map(|f| {
if f == 0 {
None
} else {
Some(f)
}
})?,
})
}
}

View file

@ -13,9 +13,11 @@ pub struct Switch {}
impl InnerProtocol for Switch {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
verb: u8,
payload: &PacketBuffer,
@ -25,9 +27,11 @@ impl InnerProtocol for Switch {
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,
@ -40,9 +44,11 @@ impl InnerProtocol for Switch {
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
&self,
host_system: &HostSystemImpl,
node: &Node<HostSystemImpl>,
source: &Arc<Peer<HostSystemImpl>>,
source_path: &Arc<Path<HostSystemImpl>>,
source_hops: u8,
message_id: u64,
in_re_verb: u8,
in_re_message_id: u64,