Moar controller work.

This commit is contained in:
Adam Ierymenko 2022-10-13 11:33:26 -04:00
parent 7ca7d2bf3e
commit 5601b83f10
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
9 changed files with 241 additions and 111 deletions

View file

@ -15,6 +15,7 @@ zerotier-vl1-service = { path = "../vl1-service" }
async-trait = "^0"
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
serde_yaml = "^0"
clap = { version = "^3", features = ["std", "suggestions"], default-features = false }
[target."cfg(not(windows))".dependencies]

View file

@ -5,8 +5,20 @@ use async_trait::async_trait;
use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorage};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::tokio::sync::broadcast::Receiver;
use crate::model::*;
#[derive(Clone)]
pub enum Change {
NetworkCreated(Network),
NetworkDeleted(Network),
NetworkChanged(Network, Network),
MemberCreated(Member),
MemberDeleted(Member),
MemberChanged(Member, Member),
}
#[async_trait]
pub trait Database: Sync + Send + NodeStorage + 'static {
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error>>;
@ -16,6 +28,21 @@ pub trait Database: Sync + Send + NodeStorage + 'static {
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Box<dyn Error>>;
async fn save_member(&self, obj: Member) -> Result<(), Box<dyn Error>>;
/// Get a receiver that can be used to receive changes made to networks and members, if supported.
///
/// The receiver returned is a broadcast receiver. This can be called more than once if there are
/// multiple parts of the controller that listen.
///
/// Changes should NOT be broadcast on call to save_network() or save_member(). They should only
/// be broadcast when externally generated changes occur.
///
/// The default implementation returns None indicating that change following is not supported.
/// Change following is required for instant deauthorization with revocations and other instant
/// changes in response to modifications to network and member configuration.
async fn changes(&self) -> Option<Receiver<Change>> {
None
}
/// List members deauthorized after a given time (milliseconds since epoch).
///
/// The default trait implementation uses a brute force method. This should be reimplemented if a

View file

@ -1,6 +1,7 @@
use std::error::Error;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
@ -8,10 +9,7 @@ use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::io::{fs_restrict_permissions, read_limit};
use zerotier_utils::json::to_json_pretty;
use zerotier_utils::tokio::fs;
use zerotier_utils::tokio::io::AsyncWriteExt;
use zerotier_utils::tokio::sync::Mutex;
use crate::database::Database;
use crate::model::*;
@ -25,32 +23,46 @@ const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret";
/// the cache. The cache will also contain any ephemeral data, generated data, etc.
pub struct FileDatabase {
base_path: PathBuf,
old_log: Mutex<fs::File>,
controller_address: AtomicU64,
}
fn network_path(base: &PathBuf, network_id: NetworkId) -> PathBuf {
base.join(network_id.to_string()).join(format!("n{}.json", network_id.to_string()))
}
fn member_path(base: &PathBuf, network_id: NetworkId, member_id: Address) -> PathBuf {
base.join(network_id.to_string()).join(format!("m{}.json", member_id.to_string()))
}
// TODO: should cache at least hashes and detect changes in the filesystem live.
impl FileDatabase {
pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self, Box<dyn Error>> {
let base: PathBuf = base_path.as_ref().into();
let changelog = base.join("_history");
let _ = fs::create_dir_all(&base).await?;
Ok(Self {
base_path: base,
old_log: Mutex::new(fs::OpenOptions::new().append(true).create(true).open(changelog).await?),
})
Ok(Self { base_path: base, controller_address: AtomicU64::new(0) })
}
fn get_controller_address(&self) -> Option<Address> {
let a = self.controller_address.load(Ordering::Relaxed);
if a == 0 {
if let Some(id) = self.load_node_identity() {
self.controller_address.store(id.address.into(), Ordering::Relaxed);
Some(id.address)
} else {
None
}
} else {
Address::from_u64(a)
}
}
fn network_path(&self, network_id: NetworkId) -> PathBuf {
self.base_path.join(format!("n{:06x}", network_id.network_no())).join("config.yaml")
}
fn member_path(&self, network_id: NetworkId, member_id: Address) -> PathBuf {
self.base_path
.join(format!("n{:06x}", network_id.network_no()))
.join(format!("m{}.yaml", member_id.to_string()))
}
}
impl NodeStorage for FileDatabase {
fn load_node_identity(&self) -> Option<Identity> {
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096);
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384);
if id_data.is_err() {
return None;
}
@ -73,29 +85,23 @@ impl NodeStorage for FileDatabase {
#[async_trait]
impl Database for FileDatabase {
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error>> {
let r = fs::read(network_path(&self.base_path, id)).await;
let r = fs::read(self.network_path(id)).await;
if let Ok(raw) = r {
Ok(Some(serde_json::from_slice::<Network>(raw.as_slice())?))
let mut network = serde_yaml::from_slice::<Network>(raw.as_slice())?;
self.get_controller_address()
.map(|a| network.id = network.id.change_network_controller(a));
Ok(Some(network))
//Ok(Some(serde_json::from_slice::<Network>(raw.as_slice())?))
} else {
Ok(None)
}
}
async fn save_network(&self, obj: Network) -> Result<(), Box<dyn Error>> {
let base_network_path = network_path(&self.base_path, obj.id);
let base_network_path = self.network_path(obj.id);
let _ = fs::create_dir_all(base_network_path.parent().unwrap()).await;
let prev = self.get_network(obj.id).await?;
if let Some(prev) = prev {
if obj == prev {
return Ok(());
}
let mut j = zerotier_utils::json::to_json(&prev);
j.push('\n');
let _ = self.old_log.lock().await.write_all(j.as_bytes()).await?;
}
let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?;
//let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?;
let _ = fs::write(base_network_path, serde_yaml::to_string(&obj)?.as_bytes()).await?;
return Ok(());
}
@ -107,7 +113,7 @@ impl Database for FileDatabase {
let name = osname.to_string_lossy();
if name.len() == (zerotier_network_hypervisor::protocol::ADDRESS_SIZE_STRING + 6)
&& name.starts_with("m")
&& name.ends_with(".json")
&& name.ends_with(".yaml")
{
if let Ok(member_address) = u64::from_str_radix(&name[1..11], 16) {
if let Some(member_address) = Address::from_u64(member_address) {
@ -120,29 +126,23 @@ impl Database for FileDatabase {
}
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Box<dyn Error>> {
let r = fs::read(member_path(&self.base_path, network_id, node_id)).await;
let r = fs::read(self.member_path(network_id, node_id)).await;
if let Ok(raw) = r {
Ok(Some(serde_json::from_slice::<Member>(raw.as_slice())?))
let mut member = serde_yaml::from_slice::<Member>(raw.as_slice())?;
self.get_controller_address()
.map(|a| member.network_id = member.network_id.change_network_controller(a));
Ok(Some(member))
//Ok(Some(serde_json::from_slice::<Member>(raw.as_slice())?))
} else {
Ok(None)
}
}
async fn save_member(&self, obj: Member) -> Result<(), Box<dyn Error>> {
let base_member_path = member_path(&self.base_path, obj.network_id, obj.node_id);
let base_member_path = self.member_path(obj.network_id, obj.node_id);
let _ = fs::create_dir_all(base_member_path.parent().unwrap()).await;
let prev = self.get_member(obj.network_id, obj.node_id).await?;
if let Some(prev) = prev {
if obj == prev {
return Ok(());
}
let mut j = zerotier_utils::json::to_json(&prev);
j.push('\n');
let _ = self.old_log.lock().await.write_all(j.as_bytes()).await?;
}
let _ = fs::write(base_member_path, to_json_pretty(&obj).as_bytes()).await?;
//let _ = fs::write(base_member_path, to_json_pretty(&obj).as_bytes()).await?;
let _ = fs::write(base_member_path, serde_yaml::to_string(&obj)?.as_bytes()).await?;
Ok(())
}

View file

@ -9,30 +9,69 @@ use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer, DEFAULT_MULTICA
use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, Node, PacketHandlerResult, Path, PathFilter, Peer};
use zerotier_network_hypervisor::vl2::{CertificateOfMembership, CertificateOfOwnership, NetworkConfig, NetworkId, Tag};
use zerotier_utils::dictionary::Dictionary;
use zerotier_utils::error::UnexpectedError;
use zerotier_utils::error::{InvalidParameterError, UnexpectedError};
use zerotier_utils::ms_since_epoch;
use zerotier_utils::reaper::Reaper;
use zerotier_utils::tokio;
use crate::database::Database;
use crate::database::*;
use crate::model::{AuthorizationResult, Member, CREDENTIAL_WINDOW_SIZE_DEFAULT};
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Handler<DatabaseImpl: Database> {
inner: Arc<Inner<DatabaseImpl>>,
change_watcher: Option<tokio::task::JoinHandle<()>>,
}
struct Inner<DatabaseImpl: Database> {
reaper: Reaper,
runtime: tokio::runtime::Handle,
inner: Arc<Inner<DatabaseImpl>>,
database: Arc<DatabaseImpl>,
local_identity: Identity,
}
impl<DatabaseImpl: Database> Handler<DatabaseImpl> {
pub fn new(database: Arc<DatabaseImpl>, runtime: tokio::runtime::Handle, local_identity: Identity) -> Arc<Self> {
assert!(local_identity.secret.is_some());
Arc::new(Self {
reaper: Reaper::new(&runtime),
runtime,
inner: Arc::new(Inner::<DatabaseImpl> { database, local_identity }),
})
pub async fn new(database: Arc<DatabaseImpl>, runtime: tokio::runtime::Handle) -> Result<Arc<Self>, Box<dyn Error>> {
if let Some(local_identity) = database.load_node_identity() {
assert!(local_identity.secret.is_some());
let inner = Arc::new(Inner::<DatabaseImpl> {
reaper: Reaper::new(&runtime),
runtime,
database: database.clone(),
local_identity,
});
let h = Arc::new(Self {
inner: inner.clone(),
change_watcher: database.changes().await.map(|mut ch| {
let inner2 = inner.clone();
inner.runtime.spawn(async move {
loop {
if let Ok(change) = ch.recv().await {
inner2.reaper.add(
inner2.runtime.spawn(inner2.clone().handle_change_notification(change)),
Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(),
);
}
}
})
}),
});
Ok(h)
} else {
Err(Box::new(InvalidParameterError(
"local controller's identity not readable by database",
)))
}
}
}
impl<DatabaseImpl: Database> Drop for Handler<DatabaseImpl> {
fn drop(&mut self) {
let _ = self.change_watcher.take().map(|w| w.abort());
}
}
@ -116,10 +155,10 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
// Launch handler as an async background task.
let (inner, source2, source_path2) = (self.inner.clone(), source.clone(), source_path.clone());
self.reaper.add(
self.runtime.spawn(async move {
self.inner.reaper.add(
self.inner.runtime.spawn(async move {
// TODO: log errors
let _ = inner.handle_network_config_request(
let result = inner.handle_network_config_request(
source2,
source_path2,
message_id,
@ -172,12 +211,11 @@ impl<DatabaseImpl: Database> InnerProtocol for Handler<DatabaseImpl> {
}
}
struct Inner<DatabaseImpl: Database> {
database: Arc<DatabaseImpl>,
local_identity: Identity,
}
impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
async fn handle_change_notification(self: Arc<Self>, _change: Change) {
todo!()
}
async fn handle_network_config_request<HostSystemImpl: HostSystem>(
self: Arc<Self>,
source: Arc<Peer<HostSystemImpl>>,
@ -214,7 +252,7 @@ impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
let mut authorized = member.as_ref().map_or(false, |m| m.authorized());
if !authorized {
if member.is_none() {
if network.learn_members {
if network.learn_members.unwrap_or(true) {
let _ = member.insert(Member::new_with_identity(source.identity.clone(), network_id));
member_changed = true;
} else {

View file

@ -16,35 +16,40 @@ use zerotier_utils::tokio::runtime::Runtime;
use zerotier_vl1_service::VL1Service;
async fn run<DatabaseImpl: Database>(database: Arc<DatabaseImpl>, runtime: &Runtime) -> i32 {
let handler = Handler::new(database.clone(), runtime.handle().clone(), todo!());
let svc = VL1Service::new(
database.clone(),
handler.clone(),
handler.clone(),
zerotier_vl1_service::VL1Settings::default(),
);
if svc.is_ok() {
let svc = svc.unwrap();
svc.node().init_default_roots();
// Wait for kill signal on Unix-like platforms.
#[cfg(unix)]
{
let term = Arc::new(AtomicBool::new(false));
let _ = signal_hook::flag::register(libc::SIGINT, term.clone());
let _ = signal_hook::flag::register(libc::SIGTERM, term.clone());
let _ = signal_hook::flag::register(libc::SIGQUIT, term.clone());
while !term.load(Ordering::Relaxed) {
std::thread::sleep(Duration::from_secs(1));
}
}
println!("Terminate signal received, shutting down...");
exitcode::OK
let handler = Handler::new(database.clone(), runtime.handle().clone()).await;
if handler.is_err() {
eprintln!("FATAL: error initializing handler: {}", handler.err().unwrap().to_string());
exitcode::ERR_CONFIG
} else {
eprintln!("FATAL: error launching service: {}", svc.err().unwrap().to_string());
exitcode::ERR_IOERR
let handler = handler.unwrap();
let svc = VL1Service::new(
database.clone(),
handler.clone(),
handler.clone(),
zerotier_vl1_service::VL1Settings::default(),
);
if svc.is_ok() {
let svc = svc.unwrap();
svc.node().init_default_roots();
// Wait for kill signal on Unix-like platforms.
#[cfg(unix)]
{
let term = Arc::new(AtomicBool::new(false));
let _ = signal_hook::flag::register(libc::SIGINT, term.clone());
let _ = signal_hook::flag::register(libc::SIGTERM, term.clone());
let _ = signal_hook::flag::register(libc::SIGQUIT, term.clone());
while !term.load(Ordering::Relaxed) {
std::thread::sleep(Duration::from_secs(1));
}
}
println!("Terminate signal received, shutting down...");
exitcode::OK
} else {
eprintln!("FATAL: error launching service: {}", svc.err().unwrap().to_string());
exitcode::ERR_IOERR
}
}
}
@ -82,14 +87,23 @@ fn main() {
if let Ok(tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() {
tokio_runtime.block_on(async {
if let Some(filedb_base_path) = global_args.value_of("filedb") {
std::process::exit(run(Arc::new(FileDatabase::new(filedb_base_path).await.unwrap()), &tokio_runtime).await);
let file_db = FileDatabase::new(filedb_base_path).await;
if file_db.is_err() {
eprintln!(
"FATAL: unable to open filesystem database at {}: {}",
filedb_base_path,
file_db.as_ref().err().unwrap().to_string()
);
std::process::exit(exitcode::ERR_IOERR)
}
std::process::exit(run(Arc::new(file_db.unwrap()), &tokio_runtime).await);
} else {
eprintln!("FATAL: no database type selected.");
std::process::exit(exitcode::ERR_USAGE);
};
});
} else {
eprintln!("FATAL: error launching service: can't start async runtime");
eprintln!("FATAL: can't start async runtime");
std::process::exit(exitcode::ERR_IOERR)
}
}

View file

@ -18,51 +18,61 @@ pub struct Member {
pub network_id: NetworkId,
/// Pinned full member identity, if known.
#[serde(skip_serializing_if = "Option::is_none")]
pub identity: Option<Identity>,
/// A short name that can also be used for DNS, etc.
#[serde(skip_serializing_if = "String::is_empty")]
#[serde(default)]
pub name: String,
/// Time member was most recently authorized, None for 'never'.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "lastAuthorizedTime")]
pub last_authorized_time: Option<i64>,
/// Time member was most recently deauthorized, None for 'never'.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "lastDeauthorizedTime")]
pub last_deauthorized_time: Option<i64>,
/// ZeroTier-managed IP assignments.
#[serde(skip_serializing_if = "HashSet::is_empty")]
#[serde(rename = "ipAssignments")]
#[serde(default)]
pub ip_assignments: HashSet<InetAddress>,
/// If true, do not auto-assign IPs in the controller.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "noAutoAssignIps")]
#[serde(default)]
pub no_auto_assign_ips: bool,
pub no_auto_assign_ips: Option<bool>,
/// If true this member is a full Ethernet bridge.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "activeBridge")]
#[serde(default)]
pub bridge: bool,
pub bridge: Option<bool>,
/// Tags that can be used in rule evaluation for ACL-like behavior.
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default)]
pub tags: HashMap<u32, u32>,
/// Member is exempt from SSO, authorization managed conventionally.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "ssoExempt")]
#[serde(default)]
pub sso_exempt: bool,
pub sso_exempt: Option<bool>,
/// If true this node is explicitly listed in every member's network configuration.
/// This is only supported for V2 nodes.
#[serde(rename = "advertised")]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub advertised: bool,
pub advertised: Option<bool>,
/// API object type documentation field, not actually edited/used.
#[serde(skip_deserializing)]
#[serde(default = "ObjectType::member")]
pub objtype: ObjectType,
}
@ -77,11 +87,11 @@ impl Member {
last_authorized_time: None,
last_deauthorized_time: None,
ip_assignments: HashSet::new(),
no_auto_assign_ips: false,
bridge: false,
no_auto_assign_ips: None,
bridge: None,
tags: HashMap::new(),
sso_exempt: false,
advertised: false,
sso_exempt: None,
advertised: None,
objtype: ObjectType::Member,
}
}

View file

@ -45,18 +45,21 @@ pub struct Network {
pub id: NetworkId,
/// Network name that's sent to network members
#[serde(skip_serializing_if = "String::is_empty")]
#[serde(default)]
pub name: String,
/// Guideline for the maximum number of multicast recipients on a network (not a hard limit).
/// Setting to zero disables multicast entirely. The default is used if this is not set.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "multicastLimit")]
pub multicast_limit: Option<u32>,
/// If true, this network supports ff:ff:ff:ff:ff:ff Ethernet broadcast.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "enableBroadcast")]
#[serde(default = "troo")]
pub enable_broadcast: bool,
#[serde(default)]
pub enable_broadcast: Option<bool>,
/// Auto IP assignment mode(s) for IPv4 addresses.
#[serde(rename = "v4AssignMode")]
@ -69,18 +72,22 @@ pub struct Network {
pub v6_assign_mode: Ipv6AssignMode,
/// IPv4 or IPv6 auto-assignment pools available, must be present to use 'zt' mode.
#[serde(skip_serializing_if = "HashSet::is_empty")]
#[serde(rename = "ipAssignmentPools")]
#[serde(default)]
pub ip_assignment_pools: HashSet<IpAssignmentPool>,
/// IPv4 or IPv6 routes to advertise.
#[serde(skip_serializing_if = "HashSet::is_empty")]
#[serde(default)]
pub ip_routes: HashSet<IpRoute>,
/// DNS records to push to members.
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub dns: HashMap<String, HashSet<InetAddress>>,
/// Network rule set.
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)]
pub rules: Vec<Rule>,
@ -92,6 +99,7 @@ pub struct Network {
/// promptly, so nodes will still deauthorize quickly even if the window is long.
///
/// Usually this does not need to be changed.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "credentialWindowSize")]
pub credential_window_size: Option<i64>,
@ -103,10 +111,12 @@ pub struct Network {
pub private: bool,
/// If true this network will add not-authorized members for anyone who requests a config.
#[serde(default = "troo")]
pub learn_members: bool,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub learn_members: Option<bool>,
/// Static object type field for use with API.
#[serde(skip_deserializing)]
#[serde(default = "ObjectType::network")]
pub objtype: ObjectType,
}

View file

@ -126,6 +126,9 @@ pub const ADDRESS_SIZE_STRING: usize = 10;
/// Prefix indicating reserved addresses (that can't actually be addresses).
pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff;
/// Bit mask for address bits in a u64.
pub const ADDRESS_MASK: u64 = 0xffffffffff;
pub(crate) mod v1 {
use super::*;

View file

@ -10,6 +10,9 @@ use zerotier_utils::error::InvalidFormatError;
use zerotier_utils::hex;
use zerotier_utils::hex::HEX_CHARS;
use crate::protocol::ADDRESS_MASK;
use crate::vl1::Address;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(transparent)]
pub struct NetworkId(NonZeroU64);
@ -17,7 +20,13 @@ pub struct NetworkId(NonZeroU64);
impl NetworkId {
#[inline(always)]
pub fn from_u64(i: u64) -> Option<NetworkId> {
NonZeroU64::new(i).map(|i| Self(i))
// Note that we check both that 'i' is non-zero and that the address of the controller is valid.
if let Some(ii) = NonZeroU64::new(i) {
if Address::from_u64(i & ADDRESS_MASK).is_some() {
return Some(Self(ii));
}
}
return None;
}
#[inline(always)]
@ -38,6 +47,24 @@ impl NetworkId {
pub fn to_bytes(&self) -> [u8; 8] {
self.0.get().to_be_bytes()
}
/// Get the network controller ID for this network, which is the most significant 40 bits.
#[inline]
pub fn network_controller(&self) -> Address {
Address::from_u64(self.0.get()).unwrap()
}
/// Consume this network ID and return one with the same network number but a different controller ID.
pub fn change_network_controller(self, new_controller: Address) -> NetworkId {
let new_controller: u64 = new_controller.into();
Self(NonZeroU64::new((self.network_no() as u64) | new_controller.wrapping_shr(24)).unwrap())
}
/// Get the 24-bit local network identifier minus the 40-bit controller address portion.
#[inline]
pub fn network_no(&self) -> u32 {
(self.0.get() & 0xffffff) as u32
}
}
impl From<NetworkId> for u64 {