mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-07 21:13:44 +02:00
A bit more simplification of generics in VL1 and VL1Service.
This commit is contained in:
parent
2fcc9e63c6
commit
d9e68701b6
10 changed files with 240 additions and 264 deletions
|
@ -1,10 +1,10 @@
|
|||
use async_trait::async_trait;
|
||||
|
||||
use zerotier_crypto::secure_eq;
|
||||
use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorageProvider};
|
||||
use zerotier_network_hypervisor::vl1::{Address, InetAddress};
|
||||
use zerotier_network_hypervisor::vl2::NetworkId;
|
||||
|
||||
use zerotier_utils::tokio::sync::broadcast::Receiver;
|
||||
use zerotier_vl1_service::VL1DataStorage;
|
||||
|
||||
use crate::model::*;
|
||||
|
||||
|
@ -22,7 +22,7 @@ pub enum Change {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Database: Sync + Send + NodeStorageProvider + 'static {
|
||||
pub trait Database: Sync + Send + VL1DataStorage + 'static {
|
||||
async fn list_networks(&self) -> Result<Vec<NetworkId>, Error>;
|
||||
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Error>;
|
||||
async fn save_network(&self, obj: Network, generate_change_notification: bool) -> Result<(), Error>;
|
||||
|
|
|
@ -1,21 +1,20 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use notify::{RecursiveMode, Watcher};
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorageProvider, Verified};
|
||||
use zerotier_network_hypervisor::vl1::{Address, Identity, Verified};
|
||||
use zerotier_network_hypervisor::vl2::NetworkId;
|
||||
use zerotier_utils::io::{fs_restrict_permissions, read_limit};
|
||||
use zerotier_utils::reaper::Reaper;
|
||||
use zerotier_utils::tokio::fs;
|
||||
use zerotier_utils::tokio::runtime::Handle;
|
||||
use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender};
|
||||
use zerotier_utils::tokio::task::JoinHandle;
|
||||
use zerotier_utils::tokio::time::{sleep, Duration, Instant};
|
||||
use zerotier_vl1_service::datadir::{load_node_identity, save_node_identity};
|
||||
use zerotier_vl1_service::VL1DataStorage;
|
||||
|
||||
use crate::cache::Cache;
|
||||
use crate::database::{Change, Database, Error};
|
||||
|
@ -34,7 +33,7 @@ const EVENT_HANDLER_TASK_TIMEOUT: Duration = Duration::from_secs(5);
|
|||
/// is different from V1 so it'll need a converter to use with V1 FileDb controller data.
|
||||
pub struct FileDatabase {
|
||||
base_path: PathBuf,
|
||||
controller_address: AtomicU64,
|
||||
local_identity: Verified<Identity>,
|
||||
change_sender: Sender<Change>,
|
||||
tasks: Reaper,
|
||||
cache: Cache,
|
||||
|
@ -53,9 +52,13 @@ impl FileDatabase {
|
|||
let db_weak = db_weak_tmp.clone();
|
||||
let runtime2 = runtime.clone();
|
||||
|
||||
let local_identity = load_node_identity(base_path.as_path())
|
||||
.ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "identity.secret not found"))?;
|
||||
let controller_address = local_identity.address;
|
||||
|
||||
let db = Arc::new(Self {
|
||||
base_path: base_path.clone(),
|
||||
controller_address: AtomicU64::new(0),
|
||||
local_identity,
|
||||
change_sender,
|
||||
tasks: Reaper::new(&runtime2),
|
||||
cache: Cache::new(),
|
||||
|
@ -65,127 +68,115 @@ impl FileDatabase {
|
|||
match event.kind {
|
||||
notify::EventKind::Create(_) | notify::EventKind::Modify(_) | notify::EventKind::Remove(_) => {
|
||||
if let Some(db) = db_weak.lock().unwrap().upgrade() {
|
||||
if let Some(controller_address) = db.get_controller_address() {
|
||||
db.clone().tasks.add(
|
||||
runtime.spawn(async move {
|
||||
if let Some(path0) = event.paths.first() {
|
||||
if let Some((record_type, network_id, node_id)) =
|
||||
Self::record_type_from_path(controller_address, path0.as_path())
|
||||
{
|
||||
// Paths to objects that were deleted or changed. Changed includes adding new objects.
|
||||
let mut deleted = None;
|
||||
let mut changed = None;
|
||||
db.clone().tasks.add(
|
||||
runtime.spawn(async move {
|
||||
if let Some(path0) = event.paths.first() {
|
||||
if let Some((record_type, network_id, node_id)) =
|
||||
Self::record_type_from_path(controller_address, path0.as_path())
|
||||
{
|
||||
// Paths to objects that were deleted or changed. Changed includes adding new objects.
|
||||
let mut deleted = None;
|
||||
let mut changed = None;
|
||||
|
||||
match event.kind {
|
||||
notify::EventKind::Create(create_kind) => match create_kind {
|
||||
notify::event::CreateKind::File => {
|
||||
changed = Some(path0.as_path());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
notify::EventKind::Modify(modify_kind) => match modify_kind {
|
||||
notify::event::ModifyKind::Data(_) => {
|
||||
changed = Some(path0.as_path());
|
||||
}
|
||||
notify::event::ModifyKind::Name(rename_mode) => match rename_mode {
|
||||
notify::event::RenameMode::Both => {
|
||||
if event.paths.len() >= 2 {
|
||||
if let Some(path1) = event.paths.last() {
|
||||
deleted = Some(path0.as_path());
|
||||
changed = Some(path1.as_path());
|
||||
}
|
||||
match event.kind {
|
||||
notify::EventKind::Create(create_kind) => match create_kind {
|
||||
notify::event::CreateKind::File => {
|
||||
changed = Some(path0.as_path());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
notify::EventKind::Modify(modify_kind) => match modify_kind {
|
||||
notify::event::ModifyKind::Data(_) => {
|
||||
changed = Some(path0.as_path());
|
||||
}
|
||||
notify::event::ModifyKind::Name(rename_mode) => match rename_mode {
|
||||
notify::event::RenameMode::Both => {
|
||||
if event.paths.len() >= 2 {
|
||||
if let Some(path1) = event.paths.last() {
|
||||
deleted = Some(path0.as_path());
|
||||
changed = Some(path1.as_path());
|
||||
}
|
||||
}
|
||||
notify::event::RenameMode::From => {
|
||||
deleted = Some(path0.as_path());
|
||||
}
|
||||
notify::event::RenameMode::To => {
|
||||
changed = Some(path0.as_path());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
},
|
||||
notify::EventKind::Remove(remove_kind) => match remove_kind {
|
||||
notify::event::RemoveKind::File => {
|
||||
}
|
||||
notify::event::RenameMode::From => {
|
||||
deleted = Some(path0.as_path());
|
||||
}
|
||||
notify::event::RenameMode::To => {
|
||||
changed = Some(path0.as_path());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if deleted.is_some() {
|
||||
match record_type {
|
||||
RecordType::Network => {
|
||||
if let Some((network, members)) =
|
||||
db.cache.on_network_deleted(network_id)
|
||||
{
|
||||
let _ =
|
||||
db.change_sender.send(Change::NetworkDeleted(network, members));
|
||||
}
|
||||
}
|
||||
RecordType::Member => {
|
||||
if let Some(node_id) = node_id {
|
||||
if let Some(member) =
|
||||
db.cache.on_member_deleted(network_id, node_id)
|
||||
{
|
||||
let _ = db.change_sender.send(Change::MemberDeleted(member));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
notify::EventKind::Remove(remove_kind) => match remove_kind {
|
||||
notify::event::RemoveKind::File => {
|
||||
deleted = Some(path0.as_path());
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if let Some(changed) = changed {
|
||||
match record_type {
|
||||
RecordType::Network => {
|
||||
if let Ok(Some(new_network)) =
|
||||
Self::load_object::<Network>(changed).await
|
||||
{
|
||||
match db.cache.on_network_updated(new_network.clone()) {
|
||||
(true, Some(old_network)) => {
|
||||
let _ = db
|
||||
.change_sender
|
||||
.send(Change::NetworkChanged(old_network, new_network));
|
||||
}
|
||||
(true, None) => {
|
||||
let _ = db
|
||||
.change_sender
|
||||
.send(Change::NetworkCreated(new_network));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if deleted.is_some() {
|
||||
match record_type {
|
||||
RecordType::Network => {
|
||||
if let Some((network, members)) = db.cache.on_network_deleted(network_id) {
|
||||
let _ = db.change_sender.send(Change::NetworkDeleted(network, members));
|
||||
}
|
||||
RecordType::Member => {
|
||||
if let Ok(Some(new_member)) = Self::load_object::<Member>(changed).await
|
||||
{
|
||||
match db.cache.on_member_updated(new_member.clone()) {
|
||||
(true, Some(old_member)) => {
|
||||
let _ = db
|
||||
.change_sender
|
||||
.send(Change::MemberChanged(old_member, new_member));
|
||||
}
|
||||
(true, None) => {
|
||||
let _ = db
|
||||
.change_sender
|
||||
.send(Change::MemberCreated(new_member));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
RecordType::Member => {
|
||||
if let Some(node_id) = node_id {
|
||||
if let Some(member) = db.cache.on_member_deleted(network_id, node_id) {
|
||||
let _ = db.change_sender.send(Change::MemberDeleted(member));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(changed) = changed {
|
||||
match record_type {
|
||||
RecordType::Network => {
|
||||
if let Ok(Some(new_network)) = Self::load_object::<Network>(changed).await {
|
||||
match db.cache.on_network_updated(new_network.clone()) {
|
||||
(true, Some(old_network)) => {
|
||||
let _ = db
|
||||
.change_sender
|
||||
.send(Change::NetworkChanged(old_network, new_network));
|
||||
}
|
||||
(true, None) => {
|
||||
let _ =
|
||||
db.change_sender.send(Change::NetworkCreated(new_network));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
RecordType::Member => {
|
||||
if let Ok(Some(new_member)) = Self::load_object::<Member>(changed).await {
|
||||
match db.cache.on_member_updated(new_member.clone()) {
|
||||
(true, Some(old_member)) => {
|
||||
let _ = db
|
||||
.change_sender
|
||||
.send(Change::MemberChanged(old_member, new_member));
|
||||
}
|
||||
(true, None) => {
|
||||
let _ =
|
||||
db.change_sender.send(Change::MemberCreated(new_member));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}),
|
||||
Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(),
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
|
@ -215,20 +206,6 @@ impl FileDatabase {
|
|||
Ok(db)
|
||||
}
|
||||
|
||||
fn get_controller_address(&self) -> Option<Address> {
|
||||
let a = self.controller_address.load(Ordering::Relaxed);
|
||||
if a == 0 {
|
||||
if let Some(id) = self.load_node_identity() {
|
||||
self.controller_address.store(id.address.into(), Ordering::Relaxed);
|
||||
Some(id.address)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
Address::from_u64(a)
|
||||
}
|
||||
}
|
||||
|
||||
fn network_path(&self, network_id: NetworkId) -> PathBuf {
|
||||
self.base_path.join(format!("N{:06x}", network_id.network_no())).join("config.yaml")
|
||||
}
|
||||
|
@ -274,25 +251,13 @@ impl Drop for FileDatabase {
|
|||
}
|
||||
}
|
||||
|
||||
impl NodeStorageProvider for FileDatabase {
|
||||
impl VL1DataStorage for FileDatabase {
|
||||
fn load_node_identity(&self) -> Option<Verified<Identity>> {
|
||||
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384);
|
||||
if id_data.is_err() {
|
||||
return None;
|
||||
}
|
||||
let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref());
|
||||
if id_data.is_err() {
|
||||
return None;
|
||||
}
|
||||
Some(Verified::assume_verified(id_data.unwrap()))
|
||||
load_node_identity(self.base_path.as_path())
|
||||
}
|
||||
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) {
|
||||
assert!(id.secret.is_some());
|
||||
let id_secret_str = id.to_secret_string();
|
||||
let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME);
|
||||
assert!(std::fs::write(&secret_path, id_secret_str.as_bytes()).is_ok());
|
||||
assert!(fs_restrict_permissions(&secret_path));
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) -> bool {
|
||||
save_node_identity(self.base_path.as_path(), id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -300,19 +265,17 @@ impl NodeStorageProvider for FileDatabase {
|
|||
impl Database for FileDatabase {
|
||||
async fn list_networks(&self) -> Result<Vec<NetworkId>, Error> {
|
||||
let mut networks = Vec::new();
|
||||
if let Some(controller_address) = self.get_controller_address() {
|
||||
let controller_address_shift24 = u64::from(controller_address).wrapping_shl(24);
|
||||
let mut dir = fs::read_dir(&self.base_path).await?;
|
||||
while let Ok(Some(ent)) = dir.next_entry().await {
|
||||
if ent.file_type().await.map_or(false, |t| t.is_dir()) {
|
||||
let osname = ent.file_name();
|
||||
let name = osname.to_string_lossy();
|
||||
if name.len() == 7 && name.starts_with("N") {
|
||||
if fs::metadata(ent.path().join("config.yaml")).await.is_ok() {
|
||||
if let Ok(nwid_last24bits) = u64::from_str_radix(&name[1..], 16) {
|
||||
if let Some(nwid) = NetworkId::from_u64(controller_address_shift24 | nwid_last24bits) {
|
||||
networks.push(nwid);
|
||||
}
|
||||
let controller_address_shift24 = u64::from(self.local_identity.address).wrapping_shl(24);
|
||||
let mut dir = fs::read_dir(&self.base_path).await?;
|
||||
while let Ok(Some(ent)) = dir.next_entry().await {
|
||||
if ent.file_type().await.map_or(false, |t| t.is_dir()) {
|
||||
let osname = ent.file_name();
|
||||
let name = osname.to_string_lossy();
|
||||
if name.len() == 7 && name.starts_with("N") {
|
||||
if fs::metadata(ent.path().join("config.yaml")).await.is_ok() {
|
||||
if let Ok(nwid_last24bits) = u64::from_str_radix(&name[1..], 16) {
|
||||
if let Some(nwid) = NetworkId::from_u64(controller_address_shift24 | nwid_last24bits) {
|
||||
networks.push(nwid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -328,12 +291,10 @@ impl Database for FileDatabase {
|
|||
// FileDatabase stores networks by their "network number" and automatically adapts their IDs
|
||||
// if the controller's identity changes. This is done to make it easy to just clone networks,
|
||||
// including storing them in "git."
|
||||
if let Some(controller_address) = self.get_controller_address() {
|
||||
let network_id_should_be = network.id.change_network_controller(controller_address);
|
||||
if network.id != network_id_should_be {
|
||||
network.id = network_id_should_be;
|
||||
let _ = self.save_network(network.clone(), false).await?;
|
||||
}
|
||||
let network_id_should_be = network.id.change_network_controller(self.local_identity.address);
|
||||
if network.id != network_id_should_be {
|
||||
network.id = network_id_should_be;
|
||||
let _ = self.save_network(network.clone(), false).await?;
|
||||
}
|
||||
}
|
||||
Ok(network)
|
||||
|
|
|
@ -7,7 +7,7 @@ use clap::{Arg, Command};
|
|||
use zerotier_network_controller::database::Database;
|
||||
use zerotier_network_controller::filedatabase::FileDatabase;
|
||||
use zerotier_network_controller::Controller;
|
||||
|
||||
use zerotier_network_hypervisor::vl1::PeerFilter;
|
||||
use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
|
||||
use zerotier_utils::exitcode;
|
||||
use zerotier_utils::tokio::runtime::Runtime;
|
||||
|
@ -22,8 +22,8 @@ async fn run(database: Arc<impl Database>, runtime: &Runtime) -> i32 {
|
|||
let handler = handler.unwrap();
|
||||
|
||||
let svc = VL1Service::new(
|
||||
database,
|
||||
handler.clone(),
|
||||
Arc::new(AdmitAllPeerFilter),
|
||||
database.clone(),
|
||||
handler.clone(),
|
||||
zerotier_vl1_service::VL1Settings::default(),
|
||||
);
|
||||
|
@ -98,3 +98,14 @@ fn main() {
|
|||
std::process::exit(exitcode::ERR_IOERR)
|
||||
}
|
||||
}
|
||||
|
||||
struct AdmitAllPeerFilter;
|
||||
impl PeerFilter for AdmitAllPeerFilter {
|
||||
fn should_respond_to(&self, id: &zerotier_crypto::verified::Verified<zerotier_network_hypervisor::vl1::Identity>) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn has_trust_relationship(&self, id: &zerotier_crypto::verified::Verified<zerotier_network_hypervisor::vl1::Identity>) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ use tokio_postgres::{Client, Statement};
|
|||
use zerotier_crypto::secure_eq;
|
||||
use zerotier_crypto::verified::Verified;
|
||||
|
||||
use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress, NodeStorageProvider};
|
||||
use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress};
|
||||
use zerotier_network_hypervisor::vl2::networkconfig::IpRoute;
|
||||
use zerotier_network_hypervisor::vl2::rule::Rule;
|
||||
use zerotier_network_hypervisor::vl2::NetworkId;
|
||||
|
@ -22,6 +22,7 @@ use zerotier_utils::tokio;
|
|||
use zerotier_utils::tokio::runtime::Handle;
|
||||
use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender};
|
||||
use zerotier_utils::tokio::task::JoinHandle;
|
||||
use zerotier_vl1_service::VL1DataStorage;
|
||||
|
||||
use crate::database::*;
|
||||
use crate::model::{IpAssignmentPool, Member, Network, RequestLogItem};
|
||||
|
@ -187,14 +188,13 @@ impl PostgresDatabase {
|
|||
}
|
||||
}
|
||||
|
||||
impl NodeStorageProvider for PostgresDatabase {
|
||||
impl VL1DataStorage for PostgresDatabase {
|
||||
fn load_node_identity(&self) -> Option<Verified<Identity>> {
|
||||
Some(self.local_identity.clone())
|
||||
}
|
||||
|
||||
fn save_node_identity(&self, _: &Verified<Identity>) {
|
||||
eprintln!("FATAL: NodeStorage::save_node_identity() not implemented in PostgresDatabase, identity must be pregenerated");
|
||||
panic!();
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) -> bool {
|
||||
panic!("local identity saving not supported by PostgresDatabase")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ pub use event::Event;
|
|||
pub use identity::Identity;
|
||||
pub use inetaddress::InetAddress;
|
||||
pub use mac::MAC;
|
||||
pub use node::{ApplicationLayer, DummyInnerLayer, InnerProtocolLayer, Node, NodeStorageProvider, PacketHandlerResult, PeerFilter};
|
||||
pub use node::{ApplicationLayer, InnerProtocolLayer, Node, PacketHandlerResult, PeerFilter};
|
||||
pub use path::Path;
|
||||
pub use peer::Peer;
|
||||
pub use rootset::{Root, RootSet};
|
||||
|
|
|
@ -31,7 +31,7 @@ use zerotier_utils::thing::Thing;
|
|||
///
|
||||
/// This is analogous to a C struct full of function pointers to callbacks along with some
|
||||
/// associated type definitions.
|
||||
pub trait ApplicationLayer: 'static {
|
||||
pub trait ApplicationLayer: Sync + Send {
|
||||
/// Type for local system sockets.
|
||||
type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static;
|
||||
|
||||
|
@ -41,8 +41,11 @@ pub trait ApplicationLayer: 'static {
|
|||
/// A VL1 level event occurred.
|
||||
fn event(&self, event: Event);
|
||||
|
||||
/// Get a reference to the local storage implementation at this host.
|
||||
fn storage(&self) -> &dyn NodeStorageProvider;
|
||||
/// Load this node's identity from the data store.
|
||||
fn load_node_identity(&self) -> Option<Verified<Identity>>;
|
||||
|
||||
/// Save this node's identity to the data store, returning true on success.
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) -> bool;
|
||||
|
||||
/// Get the PeerFilter implementation used to check whether this node should communicate at VL1 with other peers.
|
||||
fn peer_filter(&self) -> &dyn PeerFilter;
|
||||
|
@ -128,15 +131,6 @@ pub trait PeerFilter: Sync + Send {
|
|||
fn has_trust_relationship(&self, id: &Verified<Identity>) -> bool;
|
||||
}
|
||||
|
||||
/// Trait to be implemented by outside code to provide object storage to VL1
|
||||
pub trait NodeStorageProvider: Sync + Send {
|
||||
/// Load this node's identity from the data store.
|
||||
fn load_node_identity(&self) -> Option<Verified<Identity>>;
|
||||
|
||||
/// Save this node's identity to the data store.
|
||||
fn save_node_identity(&self, id: &Verified<Identity>);
|
||||
}
|
||||
|
||||
/// Result of a packet handler.
|
||||
pub enum PacketHandlerResult {
|
||||
/// Packet was handled successfully.
|
||||
|
@ -252,7 +246,7 @@ struct WhoisQueueItem {
|
|||
}
|
||||
|
||||
const PATH_MAP_SIZE: usize = std::mem::size_of::<HashMap<[u8; std::mem::size_of::<Endpoint>() + 128], Arc<Path>>>();
|
||||
type PathMap<Application> = HashMap<PathKey<'static, 'static, Application>, Arc<Path>>;
|
||||
type PathMap<LocalSocket> = HashMap<PathKey<'static, 'static, LocalSocket>, Arc<Path>>;
|
||||
|
||||
/// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network.
|
||||
pub struct Node {
|
||||
|
@ -291,14 +285,14 @@ impl Node {
|
|||
auto_upgrade_identity: bool,
|
||||
) -> Result<Self, InvalidParameterError> {
|
||||
let mut id = {
|
||||
let id = app.storage().load_node_identity();
|
||||
let id = app.load_node_identity();
|
||||
if id.is_none() {
|
||||
if !auto_generate_identity {
|
||||
return Err(InvalidParameterError("no identity found and auto-generate not enabled"));
|
||||
} else {
|
||||
let id = Identity::generate();
|
||||
app.event(Event::IdentityAutoGenerated(id.as_ref().clone()));
|
||||
app.storage().save_node_identity(&id);
|
||||
app.save_node_identity(&id);
|
||||
id
|
||||
}
|
||||
} else {
|
||||
|
@ -309,7 +303,7 @@ impl Node {
|
|||
if auto_upgrade_identity {
|
||||
let old = id.clone();
|
||||
if id.upgrade()? {
|
||||
app.storage().save_node_identity(&id);
|
||||
app.save_node_identity(&id);
|
||||
app.event(Event::IdentityAutoUpgraded(old.unwrap(), id.as_ref().clone()));
|
||||
}
|
||||
}
|
||||
|
@ -320,7 +314,7 @@ impl Node {
|
|||
instance_id: random::get_bytes_secure(),
|
||||
identity: id,
|
||||
intervals: Mutex::new(BackgroundTaskIntervals::default()),
|
||||
paths: RwLock::new(Thing::new(PathMap::<Application>::new())),
|
||||
paths: RwLock::new(Thing::new(PathMap::<Application::LocalSocket>::new())),
|
||||
peers: RwLock::new(HashMap::new()),
|
||||
roots: RwLock::new(RootInfo {
|
||||
sets: HashMap::new(),
|
||||
|
@ -673,7 +667,7 @@ impl Node {
|
|||
let mut need_keepalive = Vec::new();
|
||||
|
||||
// First check all paths in read mode to avoid blocking the entire node.
|
||||
for (k, path) in self.paths.read().unwrap().get::<PathMap<Application>>().iter() {
|
||||
for (k, path) in self.paths.read().unwrap().get::<PathMap<Application::LocalSocket>>().iter() {
|
||||
if app.local_socket_is_valid(k.local_socket()) {
|
||||
match path.service(time_ticks) {
|
||||
PathServiceResult::Ok => {}
|
||||
|
@ -687,7 +681,11 @@ impl Node {
|
|||
|
||||
// Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking.
|
||||
for dp in dead_paths.iter() {
|
||||
self.paths.write().unwrap().get_mut::<PathMap<Application>>().remove(dp);
|
||||
self.paths
|
||||
.write()
|
||||
.unwrap()
|
||||
.get_mut::<PathMap<Application::LocalSocket>>()
|
||||
.remove(dp);
|
||||
}
|
||||
|
||||
// Finally run keepalive sends as a batch.
|
||||
|
@ -1028,14 +1026,17 @@ impl Node {
|
|||
time_ticks: i64,
|
||||
) -> Arc<Path> {
|
||||
let paths = self.paths.read().unwrap();
|
||||
if let Some(path) = paths.get::<PathMap<Application>>().get(&PathKey::Ref(ep, local_socket)) {
|
||||
if let Some(path) = paths
|
||||
.get::<PathMap<Application::LocalSocket>>()
|
||||
.get(&PathKey::Ref(ep, local_socket))
|
||||
{
|
||||
path.clone()
|
||||
} else {
|
||||
drop(paths);
|
||||
self.paths
|
||||
.write()
|
||||
.unwrap()
|
||||
.get_mut::<PathMap<Application>>()
|
||||
.get_mut::<PathMap<Application::LocalSocket>>()
|
||||
.entry(PathKey::Copied(ep.clone(), local_socket.clone()))
|
||||
.or_insert_with(|| {
|
||||
Arc::new(Path::new::<Application>(
|
||||
|
@ -1050,14 +1051,14 @@ impl Node {
|
|||
}
|
||||
}
|
||||
|
||||
/// Key used to look up paths in a hash map
|
||||
/// This supports copied keys for storing and refs for fast lookup without having to copy anything.
|
||||
enum PathKey<'a, 'b, Application: ApplicationLayer + ?Sized> {
|
||||
Copied(Endpoint, Application::LocalSocket),
|
||||
Ref(&'a Endpoint, &'b Application::LocalSocket),
|
||||
/// Key used to look up paths in a hash map efficiently.
|
||||
enum PathKey<'a, 'b, LocalSocket: Hash + PartialEq + Eq + Clone> {
|
||||
Copied(Endpoint, LocalSocket),
|
||||
Ref(&'a Endpoint, &'b LocalSocket),
|
||||
}
|
||||
|
||||
impl<Application: ApplicationLayer + ?Sized> Hash for PathKey<'_, '_, Application> {
|
||||
impl<LocalSocket: Hash + PartialEq + Eq + Clone> Hash for PathKey<'_, '_, LocalSocket> {
|
||||
#[inline]
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
match self {
|
||||
Self::Copied(ep, ls) => {
|
||||
|
@ -1072,7 +1073,8 @@ impl<Application: ApplicationLayer + ?Sized> Hash for PathKey<'_, '_, Applicatio
|
|||
}
|
||||
}
|
||||
|
||||
impl<Application: ApplicationLayer + ?Sized> PartialEq for PathKey<'_, '_, Application> {
|
||||
impl<LocalSocket: Hash + PartialEq + Eq + Clone> PartialEq for PathKey<'_, '_, LocalSocket> {
|
||||
#[inline]
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2),
|
||||
|
@ -1083,40 +1085,22 @@ impl<Application: ApplicationLayer + ?Sized> PartialEq for PathKey<'_, '_, Appli
|
|||
}
|
||||
}
|
||||
|
||||
impl<Application: ApplicationLayer + ?Sized> Eq for PathKey<'_, '_, Application> {}
|
||||
impl<LocalSocket: Hash + PartialEq + Eq + Clone> Eq for PathKey<'_, '_, LocalSocket> {}
|
||||
|
||||
impl<Application: ApplicationLayer + ?Sized> PathKey<'_, '_, Application> {
|
||||
#[inline(always)]
|
||||
fn local_socket(&self) -> &Application::LocalSocket {
|
||||
impl<LocalSocket: Hash + PartialEq + Eq + Clone> PathKey<'_, '_, LocalSocket> {
|
||||
#[inline]
|
||||
fn local_socket(&self) -> &LocalSocket {
|
||||
match self {
|
||||
Self::Copied(_, ls) => ls,
|
||||
Self::Ref(_, ls) => *ls,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn to_copied(&self) -> PathKey<'static, 'static, Application> {
|
||||
#[inline]
|
||||
fn to_copied(&self) -> PathKey<'static, 'static, LocalSocket> {
|
||||
match self {
|
||||
Self::Copied(ep, ls) => PathKey::<'static, 'static, Application>::Copied(ep.clone(), ls.clone()),
|
||||
Self::Ref(ep, ls) => PathKey::<'static, 'static, Application>::Copied((*ep).clone(), (*ls).clone()),
|
||||
Self::Copied(ep, ls) => PathKey::<'static, 'static, LocalSocket>::Copied(ep.clone(), ls.clone()),
|
||||
Self::Ref(ep, ls) => PathKey::<'static, 'static, LocalSocket>::Copied((*ep).clone(), (*ls).clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Dummy no-op inner protocol for debugging and testing.
|
||||
#[derive(Default)]
|
||||
pub struct DummyInnerLayer;
|
||||
|
||||
impl InnerProtocolLayer for DummyInnerLayer {}
|
||||
|
||||
impl PeerFilter for DummyInnerLayer {
|
||||
#[inline(always)]
|
||||
fn should_respond_to(&self, _: &Verified<Identity>) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn has_trust_relationship(&self, _: &Verified<Identity>) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ use clap::error::{ContextKind, ContextValue};
|
|||
#[allow(unused_imports)]
|
||||
use clap::{Arg, ArgMatches, Command};
|
||||
|
||||
use zerotier_network_hypervisor::vl1::InnerProtocolLayer;
|
||||
use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
|
||||
use zerotier_utils::exitcode;
|
||||
use zerotier_vl1_service::datadir::DataDir;
|
||||
|
@ -209,14 +210,9 @@ fn main() {
|
|||
Some(("service", _)) => {
|
||||
drop(global_args); // free unnecessary heap before starting service as we're done with CLI args
|
||||
if let Ok(_tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() {
|
||||
let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerLayer::default());
|
||||
let test_inner = Arc::new(DummyInnerLayer);
|
||||
let datadir = open_datadir(&flags);
|
||||
let svc = VL1Service::new(
|
||||
datadir,
|
||||
test_inner.clone(),
|
||||
test_inner,
|
||||
zerotier_vl1_service::VL1Settings::default(),
|
||||
);
|
||||
let svc = VL1Service::new(todo!(), datadir, test_inner, zerotier_vl1_service::VL1Settings::default());
|
||||
if svc.is_ok() {
|
||||
let svc = svc.unwrap();
|
||||
svc.node().init_default_roots();
|
||||
|
@ -254,3 +250,7 @@ fn main() {
|
|||
|
||||
std::process::exit(exit_code);
|
||||
}
|
||||
|
||||
struct DummyInnerLayer;
|
||||
|
||||
impl InnerProtocolLayer for DummyInnerLayer {}
|
||||
|
|
|
@ -8,10 +8,12 @@ use serde::de::DeserializeOwned;
|
|||
use serde::Serialize;
|
||||
|
||||
use zerotier_crypto::random::next_u32_secure;
|
||||
use zerotier_network_hypervisor::vl1::{Identity, NodeStorageProvider, Verified};
|
||||
use zerotier_network_hypervisor::vl1::{Identity, Verified};
|
||||
use zerotier_utils::io::{fs_restrict_permissions, read_limit, DEFAULT_FILE_IO_READ_LIMIT};
|
||||
use zerotier_utils::json::to_json_pretty;
|
||||
|
||||
use crate::vl1service::VL1DataStorage;
|
||||
|
||||
pub const AUTH_TOKEN_FILENAME: &'static str = "authtoken.secret";
|
||||
pub const IDENTITY_PUBLIC_FILENAME: &'static str = "identity.public";
|
||||
pub const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret";
|
||||
|
@ -20,36 +22,43 @@ pub const CONFIG_FILENAME: &'static str = "local.conf";
|
|||
const AUTH_TOKEN_DEFAULT_LENGTH: usize = 48;
|
||||
const AUTH_TOKEN_POSSIBLE_CHARS: &'static str = "0123456789abcdefghijklmnopqrstuvwxyz";
|
||||
|
||||
pub fn load_node_identity(base_path: &Path) -> Option<Verified<Identity>> {
|
||||
let id_data = read_limit(base_path.join(IDENTITY_SECRET_FILENAME), 4096);
|
||||
if id_data.is_err() {
|
||||
return None;
|
||||
}
|
||||
let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref());
|
||||
if id_data.is_err() {
|
||||
return None;
|
||||
}
|
||||
Some(Verified::assume_verified(id_data.unwrap()))
|
||||
}
|
||||
|
||||
pub fn save_node_identity(base_path: &Path, id: &Verified<Identity>) -> bool {
|
||||
assert!(id.secret.is_some());
|
||||
let id_secret_str = id.to_secret_string();
|
||||
let id_public_str = id.to_string();
|
||||
let secret_path = base_path.join(IDENTITY_SECRET_FILENAME);
|
||||
if std::fs::write(&secret_path, id_secret_str.as_bytes()).is_err() {
|
||||
return false;
|
||||
}
|
||||
assert!(fs_restrict_permissions(&secret_path));
|
||||
return std::fs::write(base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes()).is_ok();
|
||||
}
|
||||
|
||||
pub struct DataDir<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + DeserializeOwned + 'static> {
|
||||
pub base_path: PathBuf,
|
||||
config: RwLock<Arc<Config>>,
|
||||
authtoken: Mutex<String>,
|
||||
}
|
||||
|
||||
impl<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + DeserializeOwned + 'static> NodeStorageProvider
|
||||
for DataDir<Config>
|
||||
{
|
||||
impl<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + DeserializeOwned + 'static> VL1DataStorage for DataDir<Config> {
|
||||
fn load_node_identity(&self) -> Option<Verified<Identity>> {
|
||||
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096);
|
||||
if id_data.is_err() {
|
||||
return None;
|
||||
}
|
||||
let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref());
|
||||
if id_data.is_err() {
|
||||
return None;
|
||||
}
|
||||
Some(Verified::assume_verified(id_data.unwrap()))
|
||||
load_node_identity(self.base_path.as_path())
|
||||
}
|
||||
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) {
|
||||
assert!(id.secret.is_some());
|
||||
let id_secret_str = id.to_secret_string();
|
||||
let id_public_str = id.to_string();
|
||||
let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME);
|
||||
// TODO: handle errors
|
||||
let _ = std::fs::write(&secret_path, id_secret_str.as_bytes());
|
||||
assert!(fs_restrict_permissions(&secret_path));
|
||||
let _ = std::fs::write(self.base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes());
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) -> bool {
|
||||
save_node_identity(self.base_path.as_path(), id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ fn socket_read_concurrency() -> usize {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait UdpPacketHandler: Send + Sync + 'static {
|
||||
pub trait UdpPacketHandler: Send + Sync {
|
||||
fn incoming_udp_packet(
|
||||
self: &Arc<Self>,
|
||||
time_ticks: i64,
|
||||
|
@ -189,7 +189,7 @@ impl BoundUdpPort {
|
|||
/// The caller can check the 'sockets' member variable after calling to determine which if any bindings were
|
||||
/// successful. Any errors that occurred are returned as tuples of (interface, address, error). The second vector
|
||||
/// returned contains newly bound sockets.
|
||||
pub fn update_bindings<UdpPacketHandlerImpl: UdpPacketHandler + ?Sized>(
|
||||
pub fn update_bindings<UdpPacketHandlerImpl: UdpPacketHandler + ?Sized + 'static>(
|
||||
&mut self,
|
||||
interface_prefix_blacklist: &HashSet<String>,
|
||||
cidr_blacklist: &HashSet<InetAddress>,
|
||||
|
|
|
@ -20,6 +20,12 @@ use crate::LocalSocket;
|
|||
/// Update UDP bindings every this many seconds.
|
||||
const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10;
|
||||
|
||||
/// Trait to implement to provide storage for VL1-related state information.
|
||||
pub trait VL1DataStorage: Sync + Send {
|
||||
fn load_node_identity(&self) -> Option<Verified<Identity>>;
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) -> bool;
|
||||
}
|
||||
|
||||
/// VL1 service that connects to the physical network and hosts an inner protocol like ZeroTier VL2.
|
||||
///
|
||||
/// This is the "outward facing" half of a full ZeroTier stack on a normal system. It binds sockets,
|
||||
|
@ -28,8 +34,8 @@ const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10;
|
|||
/// a test harness or just the controller for a controller that runs stand-alone.
|
||||
pub struct VL1Service<Inner: InnerProtocolLayer + ?Sized + 'static> {
|
||||
state: RwLock<VL1ServiceMutableState>,
|
||||
storage: Arc<dyn NodeStorageProvider>,
|
||||
peer_filter: Arc<dyn PeerFilter>,
|
||||
vl1_data_storage: Arc<dyn VL1DataStorage>,
|
||||
inner: Arc<Inner>,
|
||||
buffer_pool: Arc<PacketBufferPool>,
|
||||
node_container: Option<Node>, // never None, set in new()
|
||||
|
@ -44,8 +50,8 @@ struct VL1ServiceMutableState {
|
|||
|
||||
impl<Inner: InnerProtocolLayer + ?Sized + 'static> VL1Service<Inner> {
|
||||
pub fn new(
|
||||
storage: Arc<dyn NodeStorageProvider>,
|
||||
peer_filter: Arc<dyn PeerFilter>,
|
||||
vl1_data_storage: Arc<dyn VL1DataStorage>,
|
||||
inner: Arc<Inner>,
|
||||
settings: VL1Settings,
|
||||
) -> Result<Arc<Self>, Box<dyn Error>> {
|
||||
|
@ -56,8 +62,8 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> VL1Service<Inner> {
|
|||
settings,
|
||||
running: true,
|
||||
}),
|
||||
storage,
|
||||
peer_filter,
|
||||
vl1_data_storage,
|
||||
inner,
|
||||
buffer_pool: Arc::new(PacketBufferPool::new(
|
||||
std::thread::available_parallelism().map_or(2, |c| c.get() + 2),
|
||||
|
@ -212,14 +218,19 @@ impl<Inner: InnerProtocolLayer + ?Sized + 'static> ApplicationLayer for VL1Servi
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[inline(always)]
|
||||
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool {
|
||||
socket.is_valid()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn storage(&self) -> &dyn NodeStorageProvider {
|
||||
self.storage.as_ref()
|
||||
fn load_node_identity(&self) -> Option<Verified<Identity>> {
|
||||
self.vl1_data_storage.load_node_identity()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn save_node_identity(&self, id: &Verified<Identity>) -> bool {
|
||||
self.vl1_data_storage.save_node_identity(id)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
|
Loading…
Add table
Reference in a new issue