mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-08 21:43:44 +02:00
Moar controller.
This commit is contained in:
parent
ee4ce6a8ef
commit
80cee57255
4 changed files with 198 additions and 256 deletions
|
@ -9,16 +9,16 @@ use tokio::time::{Duration, Instant};
|
|||
use zerotier_network_hypervisor::protocol;
|
||||
use zerotier_network_hypervisor::protocol::{PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU};
|
||||
use zerotier_network_hypervisor::vl1::*;
|
||||
use zerotier_network_hypervisor::vl2;
|
||||
use zerotier_network_hypervisor::vl2::multicastauthority::MulticastAuthority;
|
||||
use zerotier_network_hypervisor::vl2::networkconfig::*;
|
||||
use zerotier_network_hypervisor::vl2::v1::Revocation;
|
||||
use zerotier_network_hypervisor::vl2::NetworkId;
|
||||
use zerotier_network_hypervisor::vl2::{self, MulticastGroup};
|
||||
use zerotier_utils::blob::Blob;
|
||||
use zerotier_utils::buffer::OutOfBoundsError;
|
||||
use zerotier_utils::dictionary::Dictionary;
|
||||
use zerotier_utils::error::InvalidParameterError;
|
||||
use zerotier_utils::reaper::Reaper;
|
||||
use zerotier_utils::sync::RMaybeWLockGuard;
|
||||
use zerotier_utils::tokio;
|
||||
use zerotier_utils::{ms_monotonic, ms_since_epoch};
|
||||
use zerotier_vl1_service::VL1Service;
|
||||
|
@ -38,15 +38,15 @@ pub struct Controller {
|
|||
database: Arc<dyn Database>,
|
||||
local_identity: Identity,
|
||||
|
||||
// Async tasks that should be killed when the controller is dropped.
|
||||
/// Handler for MULTICAST_LIKE and MULTICAST_GATHER messages.
|
||||
multicast_authority: MulticastAuthority,
|
||||
|
||||
/// Async tasks that should be killed when the controller is dropped.
|
||||
daemons: Mutex<Vec<tokio::task::JoinHandle<()>>>, // drop() aborts these
|
||||
|
||||
// Multicast "likes" recently received.
|
||||
multicast_subscriptions: RwLock<HashMap<(NetworkId, MulticastGroup), Mutex<HashMap<Address, i64>>>>,
|
||||
|
||||
// Recently authorized network members and when that authorization expires (in monotonic ticks).
|
||||
// Note that this is not and should not be used for real authentication, just for locking up multicast info.
|
||||
recently_authorized: RwLock<HashMap<(NetworkId, [u8; Identity::FINGERPRINT_SIZE]), i64>>,
|
||||
/// Recently authorized network members and when that authorization expires (in monotonic ticks).
|
||||
/// Note that this is not and should not be used for real authentication, just for locking up multicast info.
|
||||
recently_authorized: RwLock<HashMap<[u8; Identity::FINGERPRINT_SIZE], HashMap<NetworkId, i64>>>,
|
||||
}
|
||||
|
||||
impl Controller {
|
||||
|
@ -63,8 +63,8 @@ impl Controller {
|
|||
runtime,
|
||||
database: database.clone(),
|
||||
local_identity,
|
||||
multicast_authority: MulticastAuthority::new(),
|
||||
daemons: Mutex::new(Vec::with_capacity(2)),
|
||||
multicast_subscriptions: RwLock::new(HashMap::new()),
|
||||
recently_authorized: RwLock::new(HashMap::new()),
|
||||
}))
|
||||
} else {
|
||||
|
@ -107,34 +107,17 @@ impl Controller {
|
|||
// Create background task to expire multicast subscriptions and recent authorizations.
|
||||
let self2 = self.self_ref.clone();
|
||||
self.daemons.lock().unwrap().push(self.runtime.spawn(async move {
|
||||
let sleep_duration = Duration::from_millis((protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE / 2).min(5000) as u64);
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis((protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE / 2) as u64)).await;
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
|
||||
if let Some(self2) = self2.upgrade() {
|
||||
let time_ticks = ms_monotonic();
|
||||
let exp_before = time_ticks - protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE;
|
||||
let mut empty_subscription_entries = Vec::new();
|
||||
|
||||
for (network_group, subs) in self2.multicast_subscriptions.read().unwrap().iter() {
|
||||
let mut subs = subs.lock().unwrap();
|
||||
subs.retain(|_, t| *t > exp_before);
|
||||
if subs.is_empty() {
|
||||
empty_subscription_entries.push(network_group.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !empty_subscription_entries.is_empty() {
|
||||
let mut ms = self2.multicast_subscriptions.write().unwrap();
|
||||
for e in empty_subscription_entries.iter() {
|
||||
ms.remove(e);
|
||||
}
|
||||
}
|
||||
|
||||
self2
|
||||
.recently_authorized
|
||||
.write()
|
||||
.unwrap()
|
||||
.retain(|_, timeout| *timeout > time_ticks);
|
||||
self2.multicast_authority.clean(time_ticks);
|
||||
self2.recently_authorized.write().unwrap().retain(|_, by_network| {
|
||||
by_network.retain(|_, timeout| *timeout > time_ticks);
|
||||
!by_network.is_empty()
|
||||
});
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
@ -201,7 +184,10 @@ impl Controller {
|
|||
}
|
||||
}
|
||||
|
||||
fn send_revocations(&self, peer: &Peer, revocations: &Vec<Revocation>) {}
|
||||
/// Send one or more revocation object(s) to a peer.
|
||||
fn send_revocations(&self, peer: &Peer, revocations: Vec<Revocation>) {
|
||||
if let Some(host_system) = self.service.read().unwrap().upgrade() {}
|
||||
}
|
||||
|
||||
/// Called when the DB informs us of a change.
|
||||
async fn handle_change_notification(self: Arc<Self>, change: Change) {
|
||||
|
@ -318,7 +304,7 @@ impl Controller {
|
|||
nc.private = network.private;
|
||||
nc.timestamp = now;
|
||||
nc.credential_ttl = credential_ttl;
|
||||
nc.revision = now as u64;
|
||||
nc.revision = Some(now as u64);
|
||||
nc.mtu = network.mtu.unwrap_or(ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU as u16);
|
||||
nc.multicast_limit = network.multicast_limit.unwrap_or(DEFAULT_MULTICAST_LIMIT as u32);
|
||||
nc.multicast_like_expire = Some(protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE as u32);
|
||||
|
@ -394,7 +380,9 @@ impl Controller {
|
|||
.recently_authorized
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert((network_id, source_identity.fingerprint), ms_monotonic() + nc.credential_ttl);
|
||||
.entry(source_identity.fingerprint)
|
||||
.or_default()
|
||||
.insert(network_id, ms_monotonic() + nc.credential_ttl);
|
||||
|
||||
network_config = Some(nc);
|
||||
}
|
||||
|
@ -463,27 +451,28 @@ impl InnerProtocol for Controller {
|
|||
};
|
||||
|
||||
// Launch handler as an async background task.
|
||||
let (self2, peer, source_remote_endpoint) =
|
||||
let (self2, source, source_remote_endpoint) =
|
||||
(self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone());
|
||||
self.reaper.add(
|
||||
self.runtime.spawn(async move {
|
||||
let node_id = peer.identity.address;
|
||||
let node_fingerprint = Blob::from(peer.identity.fingerprint);
|
||||
let node_id = source.identity.address;
|
||||
let node_fingerprint = Blob::from(source.identity.fingerprint);
|
||||
let now = ms_since_epoch();
|
||||
let _host = self2.service.read().unwrap().clone().upgrade().unwrap();
|
||||
|
||||
let (result, config) = match self2.get_network_config(&peer.identity, network_id, now).await {
|
||||
let (result, config) = match self2.get_network_config(&source.identity, network_id, now).await {
|
||||
Result::Ok((result, Some(config), revocations)) => {
|
||||
//println!("{}", serde_yaml::to_string(&config).unwrap());
|
||||
self2.send_network_config(peer.as_ref(), &config, Some(message_id));
|
||||
self2.send_network_config(source.as_ref(), &config, Some(message_id));
|
||||
if let Some(revocations) = revocations {
|
||||
self2.send_revocations(peer.as_ref(), &revocations);
|
||||
self2.send_revocations(source.as_ref(), revocations);
|
||||
}
|
||||
(result, Some(config))
|
||||
}
|
||||
Result::Ok((result, None, _)) => (result, None),
|
||||
Result::Err(e) => {
|
||||
debug_event!(_host, "[vl2] ERROR getting network config: {}", e.to_string());
|
||||
#[cfg(debug_assertions)]
|
||||
let host = self2.service.read().unwrap().clone().upgrade().unwrap();
|
||||
debug_event!(host, "[vl2] ERROR getting network config: {}", e.to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
@ -500,8 +489,8 @@ impl InnerProtocol for Controller {
|
|||
} else {
|
||||
meta_data.to_bytes()
|
||||
},
|
||||
peer_version: peer.version(),
|
||||
peer_protocol_version: peer.protocol_version(),
|
||||
peer_version: source.version(),
|
||||
peer_protocol_version: source.protocol_version(),
|
||||
timestamp: now,
|
||||
source_remote_endpoint,
|
||||
source_hops,
|
||||
|
@ -517,99 +506,38 @@ impl InnerProtocol for Controller {
|
|||
}
|
||||
|
||||
protocol::message_type::VL2_MULTICAST_LIKE => {
|
||||
let mut subscriptions = RMaybeWLockGuard::new_read(&self.multicast_subscriptions);
|
||||
let auth = self.recently_authorized.read().unwrap();
|
||||
let time_ticks = ms_monotonic();
|
||||
|
||||
while payload.len() >= (8 + 6 + 4) {
|
||||
let network_id = NetworkId::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap());
|
||||
if let Some(network_id) = network_id {
|
||||
let mac = MAC::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap());
|
||||
if let Some(mac) = mac {
|
||||
if auth
|
||||
.get(&(network_id, source.identity.fingerprint))
|
||||
.map_or(false, |t| *t > time_ticks)
|
||||
{
|
||||
let sub_key = (network_id, MulticastGroup { mac, adi: payload.read_u32(&mut cursor).unwrap() });
|
||||
if let Some(sub) = subscriptions.read().get(&sub_key) {
|
||||
let _ = sub.lock().unwrap().insert(source.identity.address, time_ticks);
|
||||
} else {
|
||||
let _ = subscriptions
|
||||
.write(&self.multicast_subscriptions)
|
||||
.entry(sub_key)
|
||||
.or_insert_with(|| Mutex::new(HashMap::new()))
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(source.identity.address, time_ticks);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.multicast_authority.handle_vl2_multicast_like(
|
||||
|network_id, identity| {
|
||||
auth.get(&identity.fingerprint)
|
||||
.map_or(false, |t| t.get(&network_id).map_or(false, |t| *t > time_ticks))
|
||||
},
|
||||
time_ticks,
|
||||
source,
|
||||
payload,
|
||||
cursor,
|
||||
);
|
||||
PacketHandlerResult::Ok
|
||||
}
|
||||
|
||||
protocol::message_type::VL2_MULTICAST_GATHER => {
|
||||
if let Some(service) = self.service.read().unwrap().upgrade() {
|
||||
if let Some(network_id) = payload
|
||||
.read_bytes_fixed(&mut cursor)
|
||||
.map_or(None, |network_id| NetworkId::from_bytes_fixed(network_id))
|
||||
{
|
||||
let time_ticks = ms_monotonic();
|
||||
if self
|
||||
.recently_authorized
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(&(network_id, source.identity.fingerprint))
|
||||
.map_or(false, |t| *t > time_ticks)
|
||||
{
|
||||
cursor += 1; // skip flags, currently unused
|
||||
if let Some(mac) = payload.read_bytes_fixed(&mut cursor).map_or(None, |mac| MAC::from_bytes_fixed(mac)) {
|
||||
let mut gathered = Vec::new();
|
||||
let adi = payload.read_u32(&mut cursor).unwrap_or(0);
|
||||
let subscriptions = self.multicast_subscriptions.read().unwrap();
|
||||
if let Some(sub) = subscriptions.get(&(network_id, MulticastGroup { mac, adi })) {
|
||||
let sub = sub.lock().unwrap();
|
||||
for a in sub.keys() {
|
||||
gathered.push(*a);
|
||||
}
|
||||
}
|
||||
|
||||
while !gathered.is_empty() {
|
||||
source.send(
|
||||
service.as_ref(),
|
||||
service.node(),
|
||||
None,
|
||||
time_ticks,
|
||||
|packet| -> Result<(), OutOfBoundsError> {
|
||||
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
|
||||
ok_header.verb = protocol::message_type::VL1_OK;
|
||||
ok_header.in_re_verb = protocol::message_type::VL2_MULTICAST_GATHER;
|
||||
ok_header.in_re_message_id = message_id.to_be_bytes();
|
||||
|
||||
packet.append_bytes_fixed(&network_id.to_bytes())?;
|
||||
packet.append_bytes_fixed(&mac.to_bytes())?;
|
||||
packet.append_u32(adi)?;
|
||||
packet.append_u32(gathered.len() as u32)?;
|
||||
|
||||
let in_this_packet = gathered
|
||||
.len()
|
||||
.clamp(1, (packet.capacity() - packet.len()) / protocol::ADDRESS_SIZE)
|
||||
.min(u16::MAX as usize);
|
||||
|
||||
packet.append_u16(in_this_packet as u16)?;
|
||||
for _ in 0..in_this_packet {
|
||||
packet.append_bytes_fixed(&gathered.pop().unwrap().to_bytes())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let auth = self.recently_authorized.read().unwrap();
|
||||
let time_ticks = ms_monotonic();
|
||||
self.multicast_authority.handle_vl2_multicast_gather(
|
||||
|network_id, identity| {
|
||||
auth.get(&identity.fingerprint)
|
||||
.map_or(false, |t| t.get(&network_id).map_or(false, |t| *t > time_ticks))
|
||||
},
|
||||
time_ticks,
|
||||
service.as_ref(),
|
||||
service.node(),
|
||||
source,
|
||||
message_id,
|
||||
payload,
|
||||
cursor,
|
||||
);
|
||||
}
|
||||
PacketHandlerResult::Ok
|
||||
}
|
||||
|
@ -617,49 +545,21 @@ impl InnerProtocol for Controller {
|
|||
_ => PacketHandlerResult::NotHandled,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
_host_system: &HostSystemImpl,
|
||||
_node: &Node,
|
||||
_source: &Arc<Peer>,
|
||||
_source_path: &Arc<Path>,
|
||||
_source_hops: u8,
|
||||
_message_id: u64,
|
||||
_in_re_verb: u8,
|
||||
_in_re_message_id: u64,
|
||||
_error_code: u8,
|
||||
_payload: &PacketBuffer,
|
||||
_cursor: usize,
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
|
||||
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
_host_system: &HostSystemImpl,
|
||||
_node: &Node,
|
||||
_source: &Arc<Peer>,
|
||||
_source_path: &Arc<Path>,
|
||||
_source_hops: u8,
|
||||
_message_id: u64,
|
||||
_in_re_verb: u8,
|
||||
_in_re_message_id: u64,
|
||||
_payload: &PacketBuffer,
|
||||
_cursor: usize,
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
}
|
||||
|
||||
impl VL1AuthProvider for Controller {
|
||||
#[inline(always)]
|
||||
fn should_respond_to(&self, id: &Identity) -> bool {
|
||||
fn should_respond_to(&self, _: &Identity) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn has_trust_relationship(&self, id: &Identity) -> bool {
|
||||
false
|
||||
let time_ticks = ms_monotonic();
|
||||
self.recently_authorized
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(&id.fingerprint)
|
||||
.map_or(false, |by_network| by_network.values().any(|t| *t > time_ticks))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -162,10 +162,12 @@ pub enum PacketHandlerResult {
|
|||
///
|
||||
/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
|
||||
/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols.
|
||||
#[allow(unused)]
|
||||
pub trait InnerProtocol: Sync + Send {
|
||||
/// Handle a packet, returning true if it was handled by the next layer.
|
||||
///
|
||||
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
|
||||
/// The default version returns NotHandled.
|
||||
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
host_system: &HostSystemImpl,
|
||||
|
@ -177,9 +179,12 @@ pub trait InnerProtocol: Sync + Send {
|
|||
verb: u8,
|
||||
payload: &PacketBuffer,
|
||||
cursor: usize,
|
||||
) -> PacketHandlerResult;
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
|
||||
/// Handle errors, returning true if the error was recognized.
|
||||
/// The default version returns NotHandled.
|
||||
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
host_system: &HostSystemImpl,
|
||||
|
@ -193,9 +198,12 @@ pub trait InnerProtocol: Sync + Send {
|
|||
error_code: u8,
|
||||
payload: &PacketBuffer,
|
||||
cursor: usize,
|
||||
) -> PacketHandlerResult;
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
|
||||
/// Handle an OK, returing true if the OK was recognized.
|
||||
/// The default version returns NotHandled.
|
||||
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
host_system: &HostSystemImpl,
|
||||
|
@ -208,7 +216,9 @@ pub trait InnerProtocol: Sync + Send {
|
|||
in_re_message_id: u64,
|
||||
payload: &PacketBuffer,
|
||||
cursor: usize,
|
||||
) -> PacketHandlerResult;
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
}
|
||||
|
||||
/// How often to check the root cluster definitions against the root list and update.
|
||||
|
@ -1110,58 +1120,7 @@ impl<HostSystemImpl: HostSystem + ?Sized> PathKey<'_, '_, HostSystemImpl> {
|
|||
#[derive(Default)]
|
||||
pub struct DummyInnerProtocol;
|
||||
|
||||
impl InnerProtocol for DummyInnerProtocol {
|
||||
#[inline(always)]
|
||||
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
_host_system: &HostSystemImpl,
|
||||
_node: &Node,
|
||||
_source: &Arc<Peer>,
|
||||
_source_path: &Arc<Path>,
|
||||
_source_hops: u8,
|
||||
_message_id: u64,
|
||||
_verb: u8,
|
||||
_payload: &PacketBuffer,
|
||||
_cursor: usize,
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn handle_error<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
_host_system: &HostSystemImpl,
|
||||
_node: &Node,
|
||||
_source: &Arc<Peer>,
|
||||
_source_path: &Arc<Path>,
|
||||
_source_hops: u8,
|
||||
_message_id: u64,
|
||||
_in_re_verb: u8,
|
||||
_in_re_message_id: u64,
|
||||
_error_code: u8,
|
||||
_payload: &PacketBuffer,
|
||||
_cursor: usize,
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn handle_ok<HostSystemImpl: HostSystem + ?Sized>(
|
||||
&self,
|
||||
_host_system: &HostSystemImpl,
|
||||
_node: &Node,
|
||||
_source: &Arc<Peer>,
|
||||
_source_path: &Arc<Path>,
|
||||
_source_hops: u8,
|
||||
_message_id: u64,
|
||||
_in_re_verb: u8,
|
||||
_in_re_message_id: u64,
|
||||
_payload: &PacketBuffer,
|
||||
_cursor: usize,
|
||||
) -> PacketHandlerResult {
|
||||
PacketHandlerResult::NotHandled
|
||||
}
|
||||
}
|
||||
impl InnerProtocol for DummyInnerProtocol {}
|
||||
|
||||
impl VL1AuthProvider for DummyInnerProtocol {
|
||||
#[inline(always)]
|
||||
|
|
|
@ -3,9 +3,12 @@ use std::sync::{Arc, Mutex, RwLock};
|
|||
|
||||
use crate::protocol;
|
||||
use crate::protocol::PacketBuffer;
|
||||
use crate::vl1::{Address, HostSystem, Identity, PacketHandlerResult, Peer};
|
||||
use crate::vl1::{Address, HostSystem, Identity, Node, PacketHandlerResult, Peer, MAC};
|
||||
use crate::vl2::{MulticastGroup, NetworkId};
|
||||
|
||||
use zerotier_utils::buffer::OutOfBoundsError;
|
||||
use zerotier_utils::sync::RMaybeWLockGuard;
|
||||
|
||||
/// Handler implementations for VL2_MULTICAST_LIKE and VL2_MULTICAST_GATHER.
|
||||
///
|
||||
/// Both controllers and roots will want to handle these, with the latter supporting them for legacy
|
||||
|
@ -18,27 +21,128 @@ pub struct MulticastAuthority {
|
|||
}
|
||||
|
||||
impl MulticastAuthority {
|
||||
fn handle_vl2_multicast_like<HostSystemImpl: HostSystem + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
|
||||
#[inline]
|
||||
pub fn new() -> Self {
|
||||
Self { subscriptions: RwLock::new(HashMap::new()) }
|
||||
}
|
||||
|
||||
/// Call this every VL2_DEFAULT_MULTICAST_LIKE_EXPIRE (or more frequently) to clean expired multicast subscriptions.
|
||||
pub fn clean(&self, time_ticks: i64) {
|
||||
let exp_before = time_ticks - protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE;
|
||||
let mut empty_subscription_entries = Vec::new();
|
||||
|
||||
for (network_group, subs) in self.subscriptions.read().unwrap().iter() {
|
||||
let mut subs = subs.lock().unwrap();
|
||||
subs.retain(|_, t| *t > exp_before);
|
||||
if subs.is_empty() {
|
||||
empty_subscription_entries.push(network_group.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !empty_subscription_entries.is_empty() {
|
||||
let mut ms = self.subscriptions.write().unwrap();
|
||||
for e in empty_subscription_entries.iter() {
|
||||
ms.remove(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Call for VL2_MULTICAST_LIKE packets.
|
||||
pub fn handle_vl2_multicast_like<Authenticator: Fn(NetworkId, &Identity) -> bool>(
|
||||
&self,
|
||||
auth: Authenticator,
|
||||
host_system: &HostSystemImpl,
|
||||
time_ticks: i64,
|
||||
source: &Arc<Peer>,
|
||||
message_id: u64,
|
||||
payload: &PacketBuffer,
|
||||
mut cursor: usize,
|
||||
) -> PacketHandlerResult {
|
||||
let mut subscriptions = RMaybeWLockGuard::new_read(&self.subscriptions);
|
||||
|
||||
while payload.len() >= (8 + 6 + 4) {
|
||||
let network_id = NetworkId::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap());
|
||||
if let Some(network_id) = network_id {
|
||||
let mac = MAC::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap());
|
||||
if let Some(mac) = mac {
|
||||
if auth(network_id, &source.identity) {
|
||||
let sub_key = (network_id, MulticastGroup { mac, adi: payload.read_u32(&mut cursor).unwrap() });
|
||||
if let Some(sub) = subscriptions.read().get(&sub_key) {
|
||||
let _ = sub.lock().unwrap().insert(source.identity.address, time_ticks);
|
||||
} else {
|
||||
let _ = subscriptions
|
||||
.write(&self.subscriptions)
|
||||
.entry(sub_key)
|
||||
.or_insert_with(|| Mutex::new(HashMap::new()))
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(source.identity.address, time_ticks);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PacketHandlerResult::Ok
|
||||
}
|
||||
|
||||
fn handle_vl2_multicast_gather<HostSystemImpl: HostSystem + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
|
||||
/// Call for VL2_MULTICAST_GATHER packets.
|
||||
pub fn handle_vl2_multicast_gather<HostSystemImpl: HostSystem + ?Sized, Authenticator: Fn(NetworkId, &Identity) -> bool>(
|
||||
&self,
|
||||
auth: Authenticator,
|
||||
time_ticks: i64,
|
||||
host_system: &HostSystemImpl,
|
||||
node: &Node,
|
||||
source: &Arc<Peer>,
|
||||
message_id: u64,
|
||||
payload: &PacketBuffer,
|
||||
mut cursor: usize,
|
||||
) -> PacketHandlerResult {
|
||||
if let Some(network_id) = payload
|
||||
.read_bytes_fixed(&mut cursor)
|
||||
.map_or(None, |network_id| NetworkId::from_bytes_fixed(network_id))
|
||||
{
|
||||
if auth(network_id, &source.identity) {
|
||||
cursor += 1; // skip flags, currently unused
|
||||
if let Some(mac) = payload.read_bytes_fixed(&mut cursor).map_or(None, |mac| MAC::from_bytes_fixed(mac)) {
|
||||
let mut gathered = Vec::new();
|
||||
|
||||
let adi = payload.read_u32(&mut cursor).unwrap_or(0);
|
||||
let subscriptions = self.subscriptions.read().unwrap();
|
||||
if let Some(sub) = subscriptions.get(&(network_id, MulticastGroup { mac, adi })) {
|
||||
let sub = sub.lock().unwrap();
|
||||
for a in sub.keys() {
|
||||
gathered.push(*a);
|
||||
}
|
||||
}
|
||||
|
||||
while !gathered.is_empty() {
|
||||
source.send(host_system, node, None, time_ticks, |packet| -> Result<(), OutOfBoundsError> {
|
||||
let ok_header = packet.append_struct_get_mut::<protocol::OkHeader>()?;
|
||||
ok_header.verb = protocol::message_type::VL1_OK;
|
||||
ok_header.in_re_verb = protocol::message_type::VL2_MULTICAST_GATHER;
|
||||
ok_header.in_re_message_id = message_id.to_be_bytes();
|
||||
|
||||
packet.append_bytes_fixed(&network_id.to_bytes())?;
|
||||
packet.append_bytes_fixed(&mac.to_bytes())?;
|
||||
packet.append_u32(adi)?;
|
||||
packet.append_u32(gathered.len() as u32)?;
|
||||
|
||||
let in_this_packet = gathered
|
||||
.len()
|
||||
.clamp(1, (packet.capacity() - packet.len()) / protocol::ADDRESS_SIZE)
|
||||
.min(u16::MAX as usize);
|
||||
|
||||
packet.append_u16(in_this_packet as u16)?;
|
||||
for _ in 0..in_this_packet {
|
||||
packet.append_bytes_fixed(&gathered.pop().unwrap().to_bytes())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PacketHandlerResult::Ok
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,8 +44,10 @@ pub struct NetworkConfig {
|
|||
/// TTL for credentials on this network (or window size for V1 nodes)
|
||||
pub credential_ttl: i64,
|
||||
|
||||
/// Network configuration revision number
|
||||
pub revision: u64,
|
||||
/// Network configuration revision number (V1)
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub revision: Option<u64>,
|
||||
|
||||
/// L2 Ethernet MTU for this network.
|
||||
pub mtu: u16,
|
||||
|
@ -83,11 +85,6 @@ pub struct NetworkConfig {
|
|||
#[serde(default)]
|
||||
pub v1_credentials: Option<V1Credentials>,
|
||||
|
||||
/// Information about specific nodes such as names, services, etc. (V2 only)
|
||||
#[serde(skip_serializing_if = "HashMap::is_empty")]
|
||||
#[serde(default)]
|
||||
pub node_info: HashMap<Address, NodeInfo>,
|
||||
|
||||
/// URL to ZeroTier Central instance that is controlling the controller that issued this (if any).
|
||||
#[serde(skip_serializing_if = "String::is_empty")]
|
||||
#[serde(default)]
|
||||
|
@ -109,7 +106,7 @@ impl NetworkConfig {
|
|||
private: true,
|
||||
timestamp: 0,
|
||||
credential_ttl: 0,
|
||||
revision: 0,
|
||||
revision: None,
|
||||
mtu: 0,
|
||||
multicast_limit: 0,
|
||||
multicast_like_expire: None,
|
||||
|
@ -118,7 +115,6 @@ impl NetworkConfig {
|
|||
rules: Vec::new(),
|
||||
dns: HashMap::new(),
|
||||
v1_credentials: None,
|
||||
node_info: HashMap::new(),
|
||||
central_url: String::new(),
|
||||
sso: None,
|
||||
}
|
||||
|
@ -148,7 +144,7 @@ impl NetworkConfig {
|
|||
);
|
||||
d.set_u64(proto_v1_field_name::network_config::TIMESTAMP, self.timestamp as u64);
|
||||
d.set_u64(proto_v1_field_name::network_config::MAX_DELTA, self.credential_ttl as u64);
|
||||
d.set_u64(proto_v1_field_name::network_config::REVISION, self.revision);
|
||||
d.set_u64(proto_v1_field_name::network_config::REVISION, self.revision.unwrap_or(0));
|
||||
d.set_u64(proto_v1_field_name::network_config::MTU, self.mtu as u64);
|
||||
d.set_u64(proto_v1_field_name::network_config::MULTICAST_LIMIT, self.multicast_limit as u64);
|
||||
|
||||
|
@ -273,7 +269,7 @@ impl NetworkConfig {
|
|||
.get_i64(proto_v1_field_name::network_config::TIMESTAMP)
|
||||
.ok_or(InvalidParameterError("missing timestamp"))?;
|
||||
nc.credential_ttl = d.get_i64(proto_v1_field_name::network_config::MAX_DELTA).unwrap_or(0);
|
||||
nc.revision = d.get_u64(proto_v1_field_name::network_config::REVISION).unwrap_or(0);
|
||||
nc.revision = Some(d.get_u64(proto_v1_field_name::network_config::REVISION).unwrap_or(0));
|
||||
nc.mtu = d
|
||||
.get_u64(proto_v1_field_name::network_config::MTU)
|
||||
.unwrap_or(crate::protocol::ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU as u64) as u16;
|
||||
|
@ -457,23 +453,6 @@ pub struct V1Credentials {
|
|||
pub tags: HashMap<u32, Tag>,
|
||||
}
|
||||
|
||||
/// Information about nodes on the network that can be included in a network config.
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct NodeInfo {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub flags: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub ip: Option<InetAddress>,
|
||||
#[serde(skip_serializing_if = "String::is_empty")]
|
||||
#[serde(default)]
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "HashMap::is_empty")]
|
||||
#[serde(default)]
|
||||
pub services: HashMap<String, Option<String>>,
|
||||
}
|
||||
|
||||
/// Statically pushed L3 IP routes included with a network configuration.
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct IpRoute {
|
||||
|
|
Loading…
Add table
Reference in a new issue