More work on controller: FileDB change detection, etc.

This commit is contained in:
Adam Ierymenko 2022-11-10 18:25:36 -05:00
parent 015abb62e0
commit 5772a135f5
15 changed files with 513 additions and 130 deletions

109
controller/src/cache.rs Normal file
View file

@ -0,0 +1,109 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::HashMap;
use std::error::Error;
use std::mem::replace;
use std::sync::{Mutex, RwLock};
use crate::database::Database;
use crate::model::{Member, Network};
use zerotier_network_hypervisor::vl1::Address;
use zerotier_network_hypervisor::vl2::NetworkId;
/// Network and member cache used by database implementations to implement change detection.
pub struct Cache {
by_nwid: RwLock<HashMap<NetworkId, (Network, Mutex<HashMap<Address, Member>>)>>,
}
impl Cache {
pub fn new() -> Self {
Self { by_nwid: RwLock::new(HashMap::new()) }
}
/// Load (or reload) the entire cache from a database.
pub async fn load_all<DatabaseImpl: Database>(&self, db: &DatabaseImpl) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut by_nwid = self.by_nwid.write().unwrap();
by_nwid.clear();
let networks = db.list_networks().await?;
for network_id in networks {
if let Some(network) = db.get_network(network_id).await? {
let network_entry = by_nwid.entry(network_id).or_insert_with(|| (network, Mutex::new(HashMap::new())));
let mut by_node_id = network_entry.1.lock().unwrap();
let members = db.list_members(network_id).await?;
for node_id in members {
if let Some(member) = db.get_member(network_id, node_id).await? {
let _ = by_node_id.insert(node_id, member);
}
}
}
}
Ok(())
}
pub fn list_cached_networks(&self) -> Vec<NetworkId> {
self.by_nwid.read().unwrap().keys().cloned().collect()
}
/// Update a network if changed, returning whether or not any update was made and the old version if any.
/// A value of (true, None) indicates that there was no network by that ID in which case it is added.
pub fn on_network_updated(&self, network: Network) -> (bool, Option<Network>) {
let mut by_nwid = self.by_nwid.write().unwrap();
if let Some(prev_network) = by_nwid.get_mut(&network.id) {
if !prev_network.0.eq(&network) {
(true, Some(replace(&mut prev_network.0, network)))
} else {
(false, None)
}
} else {
let _ = by_nwid.insert(network.id, (network.clone(), Mutex::new(HashMap::new())));
(true, None)
}
}
/// Update a member if changed, returning whether or not any update was made and the old version if any.
/// A value of (true, None) indicates that there was no member with that ID. If there is no network with
/// the member's network ID (false, None) is returned and no action is taken.
pub fn on_member_updated(&self, member: Member) -> (bool, Option<Member>) {
let by_nwid = self.by_nwid.read().unwrap();
if let Some(network) = by_nwid.get(&member.network_id) {
let mut by_node_id = network.1.lock().unwrap();
if let Some(prev_member) = by_node_id.get_mut(&member.node_id) {
if !member.eq(prev_member) {
(true, Some(replace(prev_member, member)))
} else {
(false, None)
}
} else {
let _ = by_node_id.insert(member.node_id, member);
(true, None)
}
} else {
(false, None)
}
}
/// Delete a network, returning it if it existed.
pub fn on_network_deleted(&self, network_id: NetworkId) -> Option<(Network, Vec<Member>)> {
let mut by_nwid = self.by_nwid.write().unwrap();
if let Some(network) = by_nwid.remove(&network_id) {
let mut members = network.1.lock().unwrap();
Some((network.0, members.drain().map(|(_, v)| v).collect()))
} else {
None
}
}
/// Delete a member, returning it if it existed.
pub fn on_member_deleted(&self, network_id: NetworkId, node_id: Address) -> Option<Member> {
let by_nwid = self.by_nwid.read().unwrap();
if let Some(network) = by_nwid.get(&network_id) {
let mut members = network.1.lock().unwrap();
members.remove(&node_id)
} else {
None
}
}
}

View file

@ -16,7 +16,6 @@ use zerotier_network_hypervisor::vl2::v1::Revocation;
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::blob::Blob;
use zerotier_utils::buffer::OutOfBoundsError;
use zerotier_utils::dictionary::Dictionary;
use zerotier_utils::error::InvalidParameterError;
use zerotier_utils::reaper::Reaper;
use zerotier_utils::tokio;
@ -125,7 +124,7 @@ impl Controller {
}));
}
/// Compose and send network configuration packet.
/// Compose and send network configuration packet (either V1 or V2)
fn send_network_config(
&self,
peer: &Peer,
@ -184,25 +183,45 @@ impl Controller {
}
}
/// Send one or more revocation object(s) to a peer.
fn send_revocations(&self, peer: &Peer, revocations: Vec<Revocation>) {
if let Some(host_system) = self.service.read().unwrap().upgrade() {}
}
/// Send one or more revocation object(s) to a peer (V1 protocol only).
fn v1_proto_send_revocations(&self, peer: &Peer, mut revocations: Vec<Revocation>) {
if let Some(host_system) = self.service.read().unwrap().upgrade() {
let time_ticks = ms_monotonic();
while !revocations.is_empty() {
let send_count = revocations.len().min(protocol::UDP_DEFAULT_MTU / 256);
debug_assert!(send_count <= (u16::MAX as usize));
peer.send(
host_system.as_ref(),
host_system.node(),
None,
time_ticks,
|packet| -> Result<(), OutOfBoundsError> {
let payload_start = packet.len();
/// Called when the DB informs us of a change.
async fn handle_change_notification(self: Arc<Self>, change: Change) {
match change {
Change::MemberAuthorized(_, _) => {}
Change::MemberDeauthorized(network_id, node_id) => {
if let Ok(Some(member)) = self.database.get_member(network_id, node_id).await {
if !member.authorized() {
// TODO
}
}
packet.append_u8(protocol::message_type::VL2_NETWORK_CREDENTIALS)?;
packet.append_u8(0)?;
packet.append_u16(0)?;
packet.append_u16(0)?;
packet.append_u16(send_count as u16)?;
for _ in 0..send_count {
let r = revocations.pop().unwrap();
packet.append_bytes(r.to_bytes(self.local_identity.address).as_bytes())?;
}
packet.append_u16(0)?;
let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]);
packet.set_size(payload_start + new_payload_len);
Ok(())
},
);
}
}
}
/// Called when the DB informs us of a change.
async fn handle_change_notification(self: Arc<Self>, change: Change) {}
/// Attempt to create a network configuration and return the result.
///
/// This is the central function of the controller that looks up members, checks their
@ -228,20 +247,34 @@ impl Controller {
let mut member = self.database.get_member(network_id, source_identity.address).await?;
let mut member_changed = false;
// WARNING: this is where members are verified before they get admitted to a network. Read and edit
// very carefully!
// If we have a member object and a pinned identity, check to make sure it matches. Also accept
// upgraded identities to replace old versions if they are properly formed and inherit.
if let Some(member) = member.as_mut() {
if let Some(pinned_identity) = member.identity.as_ref() {
if !pinned_identity.eq(&source_identity) {
return Ok((AuthorizationResult::RejectedIdentityMismatch, None, None));
} else if source_identity.is_upgraded_from(pinned_identity) {
if source_identity.is_upgraded_from(pinned_identity) {
// Upgrade identity types if we have a V2 identity upgraded from a V1 identity.
let _ = member.identity.replace(source_identity.clone_without_secret());
member_changed = true;
} else {
return Ok((AuthorizationResult::RejectedIdentityMismatch, None, None));
}
}
} else if let Some(pinned_fingerprint) = member.identity_fingerprint.as_ref() {
if pinned_fingerprint.as_bytes().eq(&source_identity.fingerprint) {
// Learn the FULL identity if the fingerprint is pinned and they match.
let _ = member.identity.replace(source_identity.clone_without_secret());
member_changed = true;
} else {
return Ok((AuthorizationResult::RejectedIdentityMismatch, None, None));
}
}
}
// This is the final verdict after everything has been checked.
// This will be the final verdict after everything has been checked.
let mut authorization_result = AuthorizationResult::Rejected;
// This is the main "authorized" flag on the member record. If it is true then
@ -296,7 +329,7 @@ impl Controller {
let credential_ttl = network.credential_ttl.unwrap_or(CREDENTIAL_WINDOW_SIZE_DEFAULT);
// Check and if necessary auto-assign static IPs for this member.
member_changed |= network.check_zt_ip_assignments(self.database.as_ref(), &mut member).await;
member_changed |= network.assign_ip_addresses(self.database.as_ref(), &mut member).await;
let mut nc = NetworkConfig::new(network_id, source_identity.address);
@ -314,6 +347,9 @@ impl Controller {
nc.dns = network.dns;
if network.min_supported_version.unwrap_or(0) < (protocol::PROTOCOL_VERSION_V2 as u32) {
// If this network supports V1 nodes we have to include V1 credentials. Otherwise we can skip
// the overhead (bandwidth and CPU) of generating these.
if let Some(com) =
vl2::v1::CertificateOfMembership::new(&self.local_identity, network_id, &source_identity, now, credential_ttl)
{
@ -344,7 +380,7 @@ impl Controller {
nc.v1_credentials = Some(v1cred);
// Staple a bunch of revocations for anyone deauthed that still might be in the window.
// For anyone who has been deauthorized but is still in the window, send revocations.
if let Ok(deauthed_members_still_in_window) = self
.database
.list_members_deauthorized_after(network.id, now - credential_ttl)
@ -373,9 +409,10 @@ impl Controller {
}
} else {
// TODO: create V2 type credential for V2-only networks
// TODO: populate node info for V2 networks
}
// Log this member in the recently authorized cache, which is currently just used to filter whether we should
// handle multicast subscription traffic.
let _ = self
.recently_authorized
.write()
@ -432,22 +469,18 @@ impl InnerProtocol for Controller {
u64::from(network_id)
);
let meta_data = if (cursor + 2) < payload.len() {
let metadata = if (cursor + 2) < payload.len() {
let meta_data_len = payload.read_u16(&mut cursor);
if meta_data_len.is_err() {
return PacketHandlerResult::Error;
}
if let Ok(d) = payload.read_bytes(meta_data_len.unwrap() as usize, &mut cursor) {
let d = Dictionary::from_bytes(d);
if d.is_none() {
return PacketHandlerResult::Error;
}
d.unwrap()
d.to_vec()
} else {
return PacketHandlerResult::Error;
}
} else {
Dictionary::new()
Vec::new()
};
// Launch handler as an async background task.
@ -464,7 +497,7 @@ impl InnerProtocol for Controller {
//println!("{}", serde_yaml::to_string(&config).unwrap());
self2.send_network_config(source.as_ref(), &config, Some(message_id));
if let Some(revocations) = revocations {
self2.send_revocations(source.as_ref(), revocations);
self2.v1_proto_send_revocations(source.as_ref(), revocations);
}
(result, Some(config))
}
@ -484,11 +517,7 @@ impl InnerProtocol for Controller {
node_id,
node_fingerprint,
controller_node_id: self2.local_identity.address,
metadata: if meta_data.is_empty() {
Vec::new()
} else {
meta_data.to_bytes()
},
metadata,
peer_version: source.version(),
peer_protocol_version: source.protocol_version(),
timestamp: now,
@ -550,16 +579,18 @@ impl InnerProtocol for Controller {
impl VL1AuthProvider for Controller {
#[inline(always)]
fn should_respond_to(&self, _: &Identity) -> bool {
// Controllers always have to establish sessions to process requests. We don't really know if
// a member is relevant until we have looked up both the network and the member, since whether
// or not to "learn" unknown members is a network level option.
true
}
fn has_trust_relationship(&self, id: &Identity) -> bool {
let time_ticks = ms_monotonic();
self.recently_authorized
.read()
.unwrap()
.get(&id.fingerprint)
.map_or(false, |by_network| by_network.values().any(|t| *t > time_ticks))
.map_or(false, |by_network| by_network.values().any(|t| *t > ms_monotonic()))
}
}

View file

@ -11,12 +11,17 @@ use crate::model::*;
/// Database change relevant to the controller and that was NOT initiated by the controller.
#[derive(Clone)]
pub enum Change {
MemberAuthorized(NetworkId, Address),
MemberDeauthorized(NetworkId, Address),
NetworkCreated(Network),
NetworkChanged(Network, Network),
NetworkDeleted(Network),
MemberCreated(Member),
MemberChanged(Member, Member),
MemberDeleted(Member),
}
#[async_trait]
pub trait Database: Sync + Send + NodeStorage + 'static {
async fn list_networks(&self) -> Result<Vec<NetworkId>, Box<dyn Error + Send + Sync>>;
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error + Send + Sync>>;
async fn save_network(&self, obj: Network) -> Result<(), Box<dyn Error + Send + Sync>>;

View file

@ -2,7 +2,7 @@ use std::error::Error;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};
use async_trait::async_trait;
use notify::{RecursiveMode, Watcher};
@ -12,13 +12,19 @@ use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::io::{fs_restrict_permissions, read_limit};
use zerotier_utils::reaper::Reaper;
use zerotier_utils::tokio::fs;
use zerotier_utils::tokio::runtime::Handle;
use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender};
use zerotier_utils::tokio::task::JoinHandle;
use zerotier_utils::tokio::time::{sleep, Duration, Instant};
use crate::cache::Cache;
use crate::database::{Change, Database};
use crate::model::*;
const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret";
const EVENT_HANDLER_TASK_TIMEOUT: Duration = Duration::from_secs(5);
/// An in-filesystem database that permits live editing.
///
@ -28,47 +34,174 @@ const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret";
pub struct FileDatabase {
base_path: PathBuf,
controller_address: AtomicU64,
change_sender: Arc<Sender<Change>>,
watcher: Mutex<Box<dyn Watcher + Send>>,
change_sender: Sender<Change>,
tasks: Reaper,
cache: Cache,
daemon: JoinHandle<()>,
}
// TODO: should cache at least hashes and detect changes in the filesystem live.
impl FileDatabase {
pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self, Box<dyn Error + Send + Sync>> {
pub async fn new<P: AsRef<Path>>(runtime: Handle, base_path: P) -> Result<Arc<Self>, Box<dyn Error + Send + Sync>> {
let base_path: PathBuf = base_path.as_ref().into();
let _ = fs::create_dir_all(&base_path).await?;
let (change_sender, _) = channel(256);
let change_sender = Arc::new(change_sender);
let db_weak_tmp: Arc<Mutex<Weak<Self>>> = Arc::new(Mutex::new(Weak::default()));
let db_weak = db_weak_tmp.clone();
let runtime2 = runtime.clone();
let sender = Arc::downgrade(&change_sender);
let mut watcher: Box<dyn Watcher + Send> =
Box::new(notify::recommended_watcher(move |event: notify::Result<notify::event::Event>| {
if let Ok(event) = event {
if let Some(_sender) = sender.upgrade() {
// TODO
match event.kind {
notify::EventKind::Modify(_) => {}
notify::EventKind::Remove(_) => {}
_ => {}
}
}
}
})?);
let _ = watcher.configure(
notify::Config::default()
.with_compare_contents(true)
.with_poll_interval(std::time::Duration::from_secs(2)),
);
watcher.watch(&base_path, RecursiveMode::Recursive)?;
Ok(Self {
let db = Arc::new(Self {
base_path: base_path.clone(),
controller_address: AtomicU64::new(0),
change_sender,
watcher: Mutex::new(watcher),
})
tasks: Reaper::new(&runtime2),
cache: Cache::new(),
daemon: runtime2.spawn(async move {
while db_weak.lock().unwrap().upgrade().is_none() {
// Wait for parent to finish constructing and start up, then create watcher.
sleep(Duration::from_millis(10)).await;
}
let mut watcher = notify::recommended_watcher(move |event: notify::Result<notify::event::Event>| {
if let Ok(event) = event {
if let Some(db) = db_weak.lock().unwrap().upgrade() {
if let Some(controller_address) = db.get_controller_address() {
db.clone().tasks.add(
runtime.spawn(async move {
if let Some(path0) = event.paths.first() {
if let Some((record_type, network_id, node_id)) =
Self::record_type_from_path(controller_address, path0.as_path())
{
let mut deleted = None;
let mut changed = None;
match event.kind {
notify::EventKind::Create(create_kind) => match create_kind {
notify::event::CreateKind::File => {
changed = Some(path0.as_path());
}
_ => {}
},
notify::EventKind::Modify(modify_kind) => match modify_kind {
notify::event::ModifyKind::Data(_) => {
changed = Some(path0.as_path());
}
notify::event::ModifyKind::Name(rename_mode) => match rename_mode {
notify::event::RenameMode::Both => {
if event.paths.len() >= 2 {
if let Some(path1) = event.paths.last() {
deleted = Some(path0.as_path());
changed = Some(path1.as_path());
}
}
}
notify::event::RenameMode::From => {
deleted = Some(path0.as_path());
}
notify::event::RenameMode::To => {
changed = Some(path0.as_path());
}
_ => {}
},
_ => {}
},
notify::EventKind::Remove(remove_kind) => match remove_kind {
notify::event::RemoveKind::File => {
deleted = Some(path0.as_path());
}
_ => {}
},
_ => {}
}
if deleted.is_some() {
match record_type {
RecordType::Network => {
if let Some((network, mut members)) = db.cache.on_network_deleted(network_id) {
for m in members.drain(..) {
let _ = db.change_sender.send(Change::MemberDeleted(m));
}
let _ = db.change_sender.send(Change::NetworkDeleted(network));
}
}
RecordType::Member => {
if let Some(node_id) = node_id {
if let Some(member) = db.cache.on_member_deleted(network_id, node_id) {
let _ = db.change_sender.send(Change::MemberDeleted(member));
}
}
}
_ => {}
}
}
if let Some(changed) = changed {
match record_type {
RecordType::Network => {
if let Ok(Some(new_network)) = Self::get_network_internal(changed).await {
match db.cache.on_network_updated(new_network.clone()) {
(true, Some(old_network)) => {
let _ = db
.change_sender
.send(Change::NetworkChanged(old_network, new_network));
}
(true, None) => {
let _ = db.change_sender.send(Change::NetworkCreated(new_network));
}
_ => {}
}
}
}
RecordType::Member => {
if let Ok(Some(new_member)) = Self::get_member_internal(changed).await {
match db.cache.on_member_updated(new_member.clone()) {
(true, Some(old_member)) => {
let _ = db
.change_sender
.send(Change::MemberChanged(old_member, new_member));
}
(true, None) => {
let _ = db.change_sender.send(Change::MemberCreated(new_member));
}
_ => {}
}
}
}
_ => {}
}
}
}
}
}),
Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(),
);
}
}
}
})
.expect("FATAL: unable to start filesystem change listener");
let _ = watcher.configure(
notify::Config::default()
.with_compare_contents(true)
.with_poll_interval(std::time::Duration::from_secs(2)),
);
watcher
.watch(&base_path, RecursiveMode::Recursive)
.expect("FATAL: unable to watch base path");
loop {
// Any periodic background stuff can be put here. Adjust timing as needed.
sleep(Duration::from_secs(10)).await;
}
}),
});
db.cache.load_all(db.as_ref()).await?;
*db_weak_tmp.lock().unwrap() = Arc::downgrade(&db); // this kicks off watcher task too
Ok(db)
}
fn get_controller_address(&self) -> Option<Address> {
@ -86,7 +219,7 @@ impl FileDatabase {
}
fn network_path(&self, network_id: NetworkId) -> PathBuf {
self.base_path.join(format!("N{:06x}.yaml", network_id.network_no()))
self.base_path.join(format!("N{:06x}", network_id.network_no())).join("config.yaml")
}
fn member_path(&self, network_id: NetworkId, member_id: Address) -> PathBuf {
@ -94,11 +227,52 @@ impl FileDatabase {
.join(format!("N{:06x}", network_id.network_no()))
.join(format!("M{}.yaml", member_id.to_string()))
}
async fn get_network_internal<P: AsRef<Path>>(path: P) -> Result<Option<Network>, Box<dyn Error + Send + Sync>> {
let r = fs::read(path).await;
if let Ok(raw) = r {
return Ok(Some(serde_yaml::from_slice::<Network>(raw.as_slice())?));
} else {
return Ok(None);
}
}
async fn get_member_internal<P: AsRef<Path>>(path: P) -> Result<Option<Member>, Box<dyn Error + Send + Sync>> {
let r = fs::read(path).await;
if let Ok(raw) = r {
Ok(Some(serde_yaml::from_slice::<Member>(raw.as_slice())?))
} else {
Ok(None)
}
}
/// Get record type and also the number after it: network number or address.
fn record_type_from_path<P: AsRef<Path>>(controller_address: Address, pp: P) -> Option<(RecordType, NetworkId, Option<Address>)> {
let p: &Path = pp.as_ref();
if p.parent().map_or(false, |p| p.starts_with("N") && p.as_os_str().len() == 7) {
let network_id = NetworkId::from_controller_and_network_no(
controller_address,
u64::from_str_radix(&p.parent().unwrap().to_str().unwrap()[1..], 16).unwrap_or(0),
)?;
if let Some(file_name) = p.file_name().map(|p| p.to_string_lossy().to_lowercase()) {
if file_name.eq("config.yaml") {
return Some((RecordType::Network, network_id, None));
} else if file_name.len() == 16 && file_name.starts_with("m") {
return Some((
RecordType::Member,
network_id,
Some(Address::from_u64(u64::from_str_radix(&file_name.as_str()[1..], 16).unwrap_or(0))?),
));
}
}
}
return None;
}
}
impl Drop for FileDatabase {
fn drop(&mut self) {
let _ = self.watcher.lock().unwrap().unwatch(&self.base_path);
self.daemon.abort();
}
}
@ -126,35 +300,51 @@ impl NodeStorage for FileDatabase {
#[async_trait]
impl Database for FileDatabase {
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error + Send + Sync>> {
let r = fs::read(self.network_path(id)).await;
if let Ok(raw) = r {
let mut network = serde_yaml::from_slice::<Network>(raw.as_slice())?;
async fn list_networks(&self) -> Result<Vec<NetworkId>, Box<dyn Error + Send + Sync>> {
let mut networks = Vec::new();
if let Some(controller_address) = self.get_controller_address() {
let controller_address_shift24 = u64::from(controller_address).wrapping_shl(24);
let mut dir = fs::read_dir(&self.base_path).await?;
while let Ok(Some(ent)) = dir.next_entry().await {
if ent.file_type().await.map_or(false, |t| t.is_dir()) {
let osname = ent.file_name();
let name = osname.to_string_lossy();
if name.len() == 7 && name.starts_with("N") {
if fs::metadata(ent.path().join("config.yaml")).await.is_ok() {
if let Ok(nwid_last24bits) = u64::from_str_radix(&name[1..], 16) {
if let Some(nwid) = NetworkId::from_u64(controller_address_shift24 | nwid_last24bits) {
networks.push(nwid);
}
}
}
}
}
}
}
Ok(networks)
}
#[inline]
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error + Send + Sync>> {
let mut network = Self::get_network_internal(&self.network_path(id)).await?;
if let Some(network) = network.as_mut() {
// FileDatabase stores networks by their "network number" and automatically adapts their IDs
// if the controller's identity changes. This is done to make it easy to just clone networks,
// including storing them in "git."
if let Some(controller_address) = self.get_controller_address() {
let network_id_should_be = network.id.change_network_controller(controller_address);
if id != network_id_should_be {
return Ok(None);
}
if network.id != network_id_should_be {
network.id = network_id_should_be;
let _ = self.save_network(network.clone()).await;
let _ = self.save_network(network.clone()).await?;
}
}
return Ok(Some(network));
} else {
return Ok(None);
}
Ok(network)
}
async fn save_network(&self, obj: Network) -> Result<(), Box<dyn Error + Send + Sync>> {
let base_network_path = self.network_path(obj.id);
let _ = fs::create_dir_all(base_network_path.parent().unwrap()).await;
//let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?;
let _ = fs::write(base_network_path, serde_yaml::to_string(&obj)?.as_bytes()).await?;
return Ok(());
}
@ -163,15 +353,17 @@ impl Database for FileDatabase {
let mut members = Vec::new();
let mut dir = fs::read_dir(self.base_path.join(network_id.to_string())).await?;
while let Ok(Some(ent)) = dir.next_entry().await {
let osname = ent.file_name();
let name = osname.to_string_lossy();
if name.len() == (zerotier_network_hypervisor::protocol::ADDRESS_SIZE_STRING + 6)
&& name.starts_with("M")
&& name.ends_with(".yaml")
{
if let Ok(member_address) = u64::from_str_radix(&name[1..11], 16) {
if let Some(member_address) = Address::from_u64(member_address) {
members.push(member_address);
if ent.file_type().await.map_or(false, |t| t.is_file() || t.is_symlink()) {
let osname = ent.file_name();
let name = osname.to_string_lossy();
if name.len() == (zerotier_network_hypervisor::protocol::ADDRESS_SIZE_STRING + 6)
&& name.starts_with("M")
&& name.ends_with(".yaml")
{
if let Ok(member_address) = u64::from_str_radix(&name[1..11], 16) {
if let Some(member_address) = Address::from_u64(member_address) {
members.push(member_address);
}
}
}
}
@ -180,16 +372,14 @@ impl Database for FileDatabase {
}
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Box<dyn Error + Send + Sync>> {
let r = fs::read(self.member_path(network_id, node_id)).await;
if let Ok(raw) = r {
let mut member = serde_yaml::from_slice::<Member>(raw.as_slice())?;
self.get_controller_address()
.map(|a| member.network_id = member.network_id.change_network_controller(a));
Ok(Some(member))
//Ok(Some(serde_json::from_slice::<Member>(raw.as_slice())?))
} else {
Ok(None)
let mut member = Self::get_member_internal(&self.member_path(network_id, node_id)).await?;
if let Some(member) = member.as_mut() {
if member.network_id != network_id {
member.network_id = network_id;
self.save_member(member.clone()).await?;
}
}
Ok(member)
}
async fn save_member(&self, obj: Member) -> Result<(), Box<dyn Error + Send + Sync>> {
@ -227,7 +417,7 @@ mod tests {
let _ = std::fs::remove_dir_all(&test_dir);
let db = FileDatabase::new(test_dir).await.expect("new db");
let db = FileDatabase::new(tokio_runtime.handle().clone(), test_dir).await.expect("new db");
let mut test_member = Member::new_without_identity(node_id, network_id);
for x in 0..3 {

View file

@ -2,6 +2,8 @@
mod controller;
pub(crate) mod cache;
pub mod database;
pub mod filedatabase;
pub mod model;

View file

@ -78,7 +78,7 @@ fn main() {
if let Ok(tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() {
tokio_runtime.block_on(async {
if let Some(filedb_base_path) = global_args.value_of("filedb") {
let file_db = FileDatabase::new(filedb_base_path).await;
let file_db = FileDatabase::new(tokio_runtime.handle().clone(), filedb_base_path).await;
if file_db.is_err() {
eprintln!(
"FATAL: unable to open filesystem database at {}: {}",
@ -87,7 +87,7 @@ fn main() {
);
std::process::exit(exitcode::ERR_IOERR)
}
std::process::exit(run(Arc::new(file_db.unwrap()), &tokio_runtime).await);
std::process::exit(run(file_db.unwrap(), &tokio_runtime).await);
} else {
eprintln!("FATAL: no database type selected.");
std::process::exit(exitcode::ERR_USAGE);

View file

@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::blob::Blob;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Member {
@ -15,6 +16,13 @@ pub struct Member {
#[serde(rename = "networkId")]
pub network_id: NetworkId,
/// Pinned full member identity fingerprint, if known.
/// If this is set but 'identity' is not, the 'identity' field will be set on first request
/// but an identity not matching this fingerprint will not be accepted. This allows a member
/// to be created with an address and a fingerprint for full SHA384 identity specification.
#[serde(skip_serializing_if = "Option::is_none")]
pub identity_fingerprint: Option<Blob<{ Identity::FINGERPRINT_SIZE }>>,
/// Pinned full member identity, if known.
#[serde(skip_serializing_if = "Option::is_none")]
pub identity: Option<Identity>,
@ -71,11 +79,13 @@ pub struct Member {
}
impl Member {
/// Create a new network member without specifying a "pinned" identity.
pub fn new_without_identity(node_id: Address, network_id: NetworkId) -> Self {
Self {
node_id,
network_id,
identity: None,
identity_fingerprint: None,
name: String::new(),
last_authorized_time: None,
last_deauthorized_time: None,
@ -90,6 +100,7 @@ impl Member {
pub fn new_with_identity(identity: Identity, network_id: NetworkId) -> Self {
let mut tmp = Self::new_without_identity(identity.address, network_id);
tmp.identity_fingerprint = Some(Blob::from(identity.fingerprint));
tmp.identity = Some(identity);
tmp
}
@ -110,14 +121,7 @@ impl Hash for Member {
}
impl ToString for Member {
#[inline(always)]
fn to_string(&self) -> String {
zerotier_utils::json::to_json_pretty(self)
}
}
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Tag {
pub id: u32,
pub value: u32,
}

View file

@ -15,6 +15,13 @@ use zerotier_network_hypervisor::vl2::networkconfig::NetworkConfig;
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::blob::Blob;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RecordType {
Network,
Member,
RequestLogItem,
}
/// A complete network with all member configuration information for import/export or blob storage.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct NetworkExport {

View file

@ -116,7 +116,7 @@ pub struct Network {
#[serde(skip_serializing_if = "Option::is_none")]
pub mtu: Option<u16>,
/// If true the network has access control, which is usually what you want.
/// If true the network has access control, which is usually what you want and is the default if not specified.
#[serde(default = "troo")]
pub private: bool,
@ -135,7 +135,6 @@ impl Hash for Network {
}
impl ToString for Network {
#[inline(always)]
fn to_string(&self) -> String {
zerotier_utils::json::to_json_pretty(self)
}
@ -148,7 +147,7 @@ fn troo() -> bool {
impl Network {
/// Check member IP assignments and return 'true' if IP assignments were created or modified.
pub async fn check_zt_ip_assignments<DatabaseImpl: Database + ?Sized>(&self, database: &DatabaseImpl, member: &mut Member) -> bool {
pub async fn assign_ip_addresses<DatabaseImpl: Database + ?Sized>(&self, database: &DatabaseImpl, member: &mut Member) -> bool {
let mut modified = false;
if self.v4_assign_mode.as_ref().map_or(false, |m| m.zt) {

View file

@ -86,6 +86,7 @@ pub mod message_type {
pub const VL1_USER_MESSAGE: u8 = 0x14;
pub const VL2_MULTICAST_LIKE: u8 = 0x09;
pub const VL2_NETWORK_CREDENTIALS: u8 = 0x0a;
pub const VL2_NETWORK_CONFIG_REQUEST: u8 = 0x0b;
pub const VL2_NETWORK_CONFIG: u8 = 0x0c;
pub const VL2_MULTICAST_GATHER: u8 = 0x0d;

View file

@ -514,6 +514,13 @@ impl Identity {
let mut h = SHA384::new();
assert!(self.write_public(&mut h, false).is_ok());
self.fingerprint = h.finish();
// NIST guidelines specify that the left-most N bits of a hash should be taken if it's truncated.
// We want to start the fingerprint with the address, so move the hash over and discard 40 bits.
// We're not even really losing security here since the address is a hash, but NIST would not
// consider it such since it's not a NIST-approved algorithm.
self.fingerprint.copy_within(ADDRESS_SIZE..48, ADDRESS_SIZE);
self.fingerprint[..ADDRESS_SIZE].copy_from_slice(&self.address.to_bytes());
}
#[inline(always)]

View file

@ -18,7 +18,7 @@ use crate::vl1::Address;
pub struct NetworkId(NonZeroU64);
impl NetworkId {
#[inline(always)]
#[inline]
pub fn from_u64(i: u64) -> Option<NetworkId> {
// Note that we check both that 'i' is non-zero and that the address of the controller is valid.
if let Some(ii) = NonZeroU64::new(i) {
@ -29,7 +29,12 @@ impl NetworkId {
return None;
}
#[inline(always)]
#[inline]
pub fn from_controller_and_network_no(controller: Address, network_no: u64) -> Option<NetworkId> {
Self::from_u64(u64::from(controller).wrapping_shl(24) | (network_no & 0xffffff))
}
#[inline]
pub fn from_bytes(b: &[u8]) -> Option<NetworkId> {
if b.len() >= 8 {
Self::from_bytes_fixed(b[0..8].try_into().unwrap())
@ -38,12 +43,12 @@ impl NetworkId {
}
}
#[inline(always)]
#[inline]
pub fn from_bytes_fixed(b: &[u8; 8]) -> Option<NetworkId> {
Self::from_u64(u64::from_be_bytes(*b))
}
#[inline(always)]
#[inline]
pub fn to_bytes(&self) -> [u8; 8] {
self.0.get().to_be_bytes()
}

View file

@ -13,6 +13,14 @@ use zerotier_utils::blob::Blob;
use zerotier_utils::error::InvalidParameterError;
use zerotier_utils::memory;
/// ZeroTier V1 certificate of membership.
///
/// The somewhat odd encoding of this is an artifact of an old V1 design choice: certificates are
/// tuples of arbitrary values coupled by how different they are permitted to be (max delta).
///
/// This was done to permit some things such as geo-fencing that were never implemented, so it's
/// a bit of a case of YAGNI. In V2 this is deprecated in favor of a more standard sort of
/// certificate.
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CertificateOfMembership {
pub network_id: NetworkId,
@ -50,31 +58,33 @@ impl CertificateOfMembership {
q[0] = 0;
q[1] = self.timestamp.to_be() as u64;
q[2] = self.max_delta.to_be() as u64;
q[2] = self.max_delta.to_be() as u64; // TTL / "window" in V1
q[3] = 1u64.to_be();
let nwid: u64 = self.network_id.into();
q[4] = nwid.to_be();
q[5] = 0;
q[4] = u64::from(self.network_id).to_be();
q[5] = 0; // no disagreement permitted
q[6] = 2u64.to_be();
let a: u64 = self.issued_to.into();
q[7] = a.to_be();
q[8] = 0xffffffffffffffffu64; // no to_be needed
q[7] = u64::from(self.issued_to).to_be();
q[8] = u64::MAX; // no to_be needed for all-1s
// This is a fix for a security issue in V1 in which an attacker could (with much CPU use)
// duplciate an identity and insert themselves in place of one after 30-60 days when local
// identity caches expire. The full hash should have been included from the beginning, and
// V2 only ever uses the full hash of the identity to verify credentials.
let fp = self.issued_to_fingerprint.as_bytes();
q[9] = 3;
q[10] = u64::from_ne_bytes(fp[0..8].try_into().unwrap());
q[11] = 0xffffffffffffffffu64;
q[11] = u64::MAX; // these will never agree; they're explicitly checked in V1
q[12] = 4;
q[13] = u64::from_ne_bytes(fp[8..16].try_into().unwrap());
q[14] = 0xffffffffffffffffu64;
q[14] = u64::MAX;
q[15] = 5;
q[16] = u64::from_ne_bytes(fp[16..24].try_into().unwrap());
q[17] = 0xffffffffffffffffu64;
q[17] = u64::MAX;
q[18] = 6;
q[19] = u64::from_ne_bytes(fp[24..32].try_into().unwrap());
q[20] = 0xffffffffffffffffu64;
q[20] = u64::MAX;
*memory::as_byte_array(&q)
memory::to_byte_array(q)
}
/// Get the identity fingerprint used in V1, which only covers the curve25519 keys.

View file

@ -76,4 +76,9 @@ impl Revocation {
v
}
#[inline(always)]
pub fn to_bytes(&self, controller_address: Address) -> ArrayVec<u8, 256> {
self.internal_to_bytes(false, controller_address)
}
}

View file

@ -66,6 +66,14 @@ pub fn as_byte_array<T: Copy, const S: usize>(o: &T) -> &[u8; S] {
unsafe { &*(o as *const T).cast() }
}
/// Transmute an object to a byte array.
/// The template parameter S must equal the size of the object in bytes or this will panic.
#[inline(always)]
pub fn to_byte_array<T: Copy, const S: usize>(o: T) -> [u8; S] {
assert_eq!(S, size_of::<T>());
unsafe { *(&o as *const T).cast() }
}
/// Get a byte array as a flat object.
///
/// WARNING: while this is technically safe, care must be taken if the object requires aligned access.