More work on VL1 service

This commit is contained in:
Adam Ierymenko 2022-09-16 13:50:39 -04:00
parent b4edad6bfb
commit b4c74ce7bb
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
7 changed files with 265 additions and 68 deletions

View file

@ -7,21 +7,6 @@ use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::{Address, Endpoint, InetAddress};
use zerotier_network_hypervisor::vl2::NetworkId;
/// A list of unassigned or obsolete ports under 1024 that could possibly be squatted.
pub const UNASSIGNED_PRIVILEGED_PORTS: [u16; 299] = [
4, 6, 8, 10, 12, 14, 15, 16, 26, 28, 30, 32, 34, 36, 40, 60, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 285, 288, 289, 290,
291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332,
334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 703, 708, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727,
728, 732, 733, 734, 735, 736, 737, 738, 739, 740, 743, 745, 746, 755, 756, 766, 768, 778, 779, 781, 782, 783, 784, 785, 786, 787, 788,
789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 802, 803, 804, 805, 806, 807, 808, 809, 811, 812, 813, 814, 815, 816, 817, 818,
819, 820, 821, 822, 823, 824, 825, 826, 827, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 849, 850, 851, 852, 853,
854, 855, 856, 857, 858, 859, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883,
884, 885, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 904, 905, 906, 907, 908, 909, 910, 911, 914, 915, 916, 917, 918, 919,
920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946,
947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973,
974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1023,
];
/// Default primary ZeroTier port.
pub const DEFAULT_PORT: u16 = 9993;

View file

@ -13,6 +13,7 @@ async-trait = "^0"
num-traits = "^0"
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
[target."cfg(windows)".dependencies]
winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -0,0 +1,16 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
/// A list of unassigned or obsolete ports under 1024 that could possibly be squatted.
pub const UNASSIGNED_PRIVILEGED_PORTS: [u16; 299] = [
4, 6, 8, 10, 12, 14, 15, 16, 26, 28, 30, 32, 34, 36, 40, 60, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 285, 288, 289, 290,
291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332,
334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 703, 708, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727,
728, 732, 733, 734, 735, 736, 737, 738, 739, 740, 743, 745, 746, 755, 756, 766, 768, 778, 779, 781, 782, 783, 784, 785, 786, 787, 788,
789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 802, 803, 804, 805, 806, 807, 808, 809, 811, 812, 813, 814, 815, 816, 817, 818,
819, 820, 821, 822, 823, 824, 825, 826, 827, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 849, 850, 851, 852, 853,
854, 855, 856, 857, 858, 859, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883,
884, 885, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 904, 905, 906, 907, 908, 909, 910, 911, 914, 915, 916, 917, 918, 919,
920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946,
947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973,
974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1023,
];

View file

@ -2,10 +2,13 @@
mod localinterface;
mod localsocket;
mod settings;
mod vl1service;
pub mod constants;
pub mod sys;
pub use localinterface::LocalInterface;
pub use localsocket::LocalSocket;
pub use settings::Settings;
pub use vl1service::*;

View file

@ -0,0 +1,46 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::InetAddress;
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct Settings {
/// Primary ZeroTier port that is always bound, default is 9993.
pub fixed_ports: Vec<u16>,
/// Number of additional random ports to bind.
pub random_port_count: usize,
/// Enable uPnP, NAT-PMP, and other router port mapping technologies?
pub port_mapping: bool,
/// Interface name prefix blacklist for local bindings (not remote IPs).
pub interface_prefix_blacklist: Vec<String>,
/// IP/bits CIDR blacklist for local bindings (not remote IPs).
pub cidr_blacklist: Vec<InetAddress>,
}
impl Settings {
#[cfg(target_os = "macos")]
pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 10] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt", "llw", "anpi"];
#[cfg(target_os = "linux")]
pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"];
#[cfg(windows)]
pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 0] = [];
}
impl Default for Settings {
fn default() -> Self {
Self {
fixed_ports: vec![9993],
random_port_count: 5,
port_mapping: true,
interface_prefix_blacklist: Self::DEFAULT_PREFIX_BLACKLIST.iter().map(|s| s.to_string()).collect(),
cidr_blacklist: Vec::new(),
}
}
}

View file

@ -4,9 +4,10 @@ use std::collections::HashMap;
#[allow(unused_imports)]
use std::mem::{size_of, transmute, MaybeUninit};
#[allow(unused_imports)]
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
#[allow(unused_imports)]
use std::ptr::{null, null_mut};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
#[cfg(unix)]
@ -35,9 +36,20 @@ pub struct BoundUdpSocket {
pub address: InetAddress,
pub socket: Arc<tokio::net::UdpSocket>,
pub interface: LocalInterface,
pub associated_tasks: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
last_receive_time: AtomicI64,
fd: RawFd,
}
impl Drop for BoundUdpSocket {
fn drop(&mut self) {
let mut associated_tasks = self.associated_tasks.lock();
for t in associated_tasks.drain(..) {
t.abort();
}
}
}
impl BoundUdpSocket {
#[cfg(unix)]
#[inline(always)]
@ -54,31 +66,6 @@ impl BoundUdpSocket {
};
}
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool {
let mut ok = false;
if dest.family() == self.address.family() {
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(packet_ttl);
}
unsafe {
ok = libc::sendto(
self.fd.as_(),
b.as_ptr().cast(),
b.len().as_(),
0,
(dest as *const InetAddress).cast(),
size_of::<InetAddress>().as_(),
) > 0;
}
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(0xff);
}
}
ok
}
#[cfg(not(any(target_os = "macos", target_os = "freebsd")))]
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool {
let mut ok = false;
if dest.family() == self.address.family() {
@ -92,6 +79,14 @@ impl BoundUdpSocket {
}
ok
}
pub async fn receive<B: AsMut<[u8]> + Send>(&self, mut buffer: B, current_time: i64) -> tokio::io::Result<(usize, SocketAddr)> {
let result = self.socket.recv_from(buffer.as_mut()).await;
if result.is_ok() {
self.last_receive_time.store(current_time, Ordering::Relaxed);
}
result
}
}
impl BoundUdpPort {
@ -102,6 +97,17 @@ impl BoundUdpPort {
Self { sockets: Vec::new(), port }
}
/// Return a tuple of: total number of Arc<>+Weak<> references to sockets, and most recent receive time on any socket.
pub fn liveness(&self) -> (usize, i64) {
let mut rt_latest = i64::MIN;
let mut total_handles = 0;
for s in self.sockets.iter() {
rt_latest = rt_latest.max(s.last_receive_time.load(Ordering::Relaxed));
total_handles += Arc::strong_count(s) + Arc::weak_count(s);
}
(total_handles, rt_latest)
}
/// Synchronize bindings with devices and IPs in system.
///
/// Any device or local IP within any of the supplied blacklists is ignored. Multicast or loopback addresses are
@ -129,6 +135,7 @@ impl BoundUdpPort {
let interface_str = interface.to_string();
let mut addr_with_port = address.clone();
addr_with_port.set_port(self.port);
if address.is_ip()
&& matches!(
address.scope(),
@ -145,6 +152,7 @@ impl BoundUdpPort {
self.sockets.push(socket.clone());
}
}
if !found {
let s = unsafe { bind_udp_to_device(interface_str.as_str(), &addr_with_port) };
if s.is_ok() {
@ -155,6 +163,7 @@ impl BoundUdpPort {
address: addr_with_port,
socket: Arc::new(s.unwrap()),
interface: interface.clone(),
last_receive_time: AtomicI64::new(i64::MIN),
fd,
});
self.sockets.push(s.clone());
@ -177,6 +186,19 @@ impl BoundUdpPort {
}
}
/// Attempt to bind universally to a given UDP port and then close to determine if we can use it.
///
/// This succeeds if either IPv4 or IPv6 global can be bound.
pub fn udp_test_bind(port: u16) -> bool {
std::net::UdpSocket::bind(
&[
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port),
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port),
][..],
)
.is_ok()
}
#[allow(unused_variables)]
#[cfg(unix)]
unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result<RawFd, &'static str> {

View file

@ -7,10 +7,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use zerotier_crypto::random;
use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InnerProtocol, Node, PathFilter, Storage};
use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InetAddress, InnerProtocol, Node, PathFilter, Storage};
use zerotier_utils::{ms_monotonic, ms_since_epoch};
use crate::sys::udp::BoundUdpPort;
use crate::constants::UNASSIGNED_PRIVILEGED_PORTS;
use crate::settings::Settings;
use crate::sys::udp::{udp_test_bind, BoundUdpPort};
use crate::LocalSocket;
use tokio::task::JoinHandle;
use tokio::time::Duration;
@ -22,14 +25,19 @@ use tokio::time::Duration;
/// whatever inner protocol implementation is using it. This would typically be VL2 but could be
/// a test harness or just the controller for a controller that runs stand-alone.
pub struct VL1Service<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl: InnerProtocol> {
daemons: parking_lot::Mutex<Vec<JoinHandle<()>>>,
udp_sockets_by_port: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>,
state: tokio::sync::RwLock<VL1ServiceMutableState>,
storage: Arc<StorageImpl>,
inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
node_container: Option<Node<Self>>,
}
struct VL1ServiceMutableState {
daemons: Vec<JoinHandle<()>>,
udp_sockets: HashMap<u16, parking_lot::RwLock<BoundUdpPort>>,
settings: Settings,
}
impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl: InnerProtocol>
VL1Service<StorageImpl, PathFilterImpl, InnerProtocolImpl>
{
@ -37,26 +45,28 @@ impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl:
storage: Arc<StorageImpl>,
inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
settings: Settings,
) -> Result<Arc<Self>, Box<dyn Error>> {
let mut service = VL1Service {
daemons: parking_lot::Mutex::new(Vec::with_capacity(2)),
udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(8)),
state: tokio::sync::RwLock::new(VL1ServiceMutableState {
daemons: Vec::with_capacity(2),
udp_sockets: HashMap::with_capacity(8),
settings,
}),
storage,
inner,
path_filter,
node_container: None,
};
service
.node_container
.replace(Node::new(&service, &*service.storage, true, false).await?);
let service = Arc::new(service);
let mut daemons = service.daemons.lock();
daemons.push(tokio::spawn(service.clone().udp_bind_daemon()));
daemons.push(tokio::spawn(service.clone().node_background_task_daemon()));
drop(daemons);
let mut state = service.state.write().await;
state.daemons.push(tokio::spawn(service.clone().udp_bind_daemon()));
state.daemons.push(tokio::spawn(service.clone().node_background_task_daemon()));
drop(state);
Ok(service)
}
@ -67,9 +77,125 @@ impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl:
unsafe { self.node_container.as_ref().unwrap_unchecked() }
}
async fn udp_bind_daemon(self: Arc<Self>) {}
pub async fn bound_udp_ports(&self) -> Vec<u16> {
self.state.read().await.udp_sockets.keys().cloned().collect()
}
async fn node_background_task_daemon(self: Arc<Self>) {}
async fn udp_bind_daemon(self: Arc<Self>) {
loop {
let state = self.state.read().await;
let mut need_fixed_ports: HashSet<u16> = HashSet::from_iter(state.settings.fixed_ports.iter().cloned());
let mut have_random_port_count = 0;
for (p, _) in state.udp_sockets.iter() {
need_fixed_ports.remove(p);
have_random_port_count += (!state.settings.fixed_ports.contains(p)) as usize;
}
let desired_random_port_count = state.settings.random_port_count;
let state = if !need_fixed_ports.is_empty() || have_random_port_count != desired_random_port_count {
drop(state);
let mut state = self.state.write().await;
for p in need_fixed_ports.iter() {
state.udp_sockets.insert(*p, parking_lot::RwLock::new(BoundUdpPort::new(*p)));
}
while have_random_port_count > desired_random_port_count {
let mut most_stale_binding_liveness = (usize::MAX, i64::MAX);
let mut most_stale_binding_port = 0;
for (p, s) in state.udp_sockets.iter() {
if !state.settings.fixed_ports.contains(p) {
let (total_smart_ptr_handles, most_recent_receive) = s.read().liveness();
if total_smart_ptr_handles < most_stale_binding_liveness.0
|| (total_smart_ptr_handles == most_stale_binding_liveness.0
&& most_recent_receive <= most_stale_binding_liveness.1)
{
most_stale_binding_liveness.0 = total_smart_ptr_handles;
most_stale_binding_liveness.1 = most_recent_receive;
most_stale_binding_port = *p;
}
}
}
if most_stale_binding_port != 0 {
have_random_port_count -= state.udp_sockets.remove(&most_stale_binding_port).is_some() as usize;
} else {
break;
}
}
'outer_add_port_loop: while have_random_port_count < desired_random_port_count {
let rn = random::xorshift64_random() as usize;
for i in 0..UNASSIGNED_PRIVILEGED_PORTS.len() {
let p = UNASSIGNED_PRIVILEGED_PORTS[rn.wrapping_add(i) % UNASSIGNED_PRIVILEGED_PORTS.len()];
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)));
continue 'outer_add_port_loop;
}
}
let p = 50000 + ((random::xorshift64_random() as u16) % 15535);
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)));
}
}
drop(state);
self.state.read().await
} else {
state
};
let num_cores = std::thread::available_parallelism().map_or(1, |c| c.get());
for (_, binding) in state.udp_sockets.iter() {
let mut binding = binding.write();
let (_, mut new_sockets) =
binding.update_bindings(&state.settings.interface_prefix_blacklist, &state.settings.cidr_blacklist);
for s in new_sockets.drain(..) {
// Start one async task per system core. This is technically not necessary because tokio
// schedules and multiplexes, but this enables tokio to grab and schedule packets
// concurrently for up to the number of cores available for any given socket and is
// probably faster than other patterns that involve iterating through sockets and creating
// arrays of futures or using channels.
let mut socket_tasks = Vec::with_capacity(num_cores);
for _ in 0..num_cores {
let self_copy = self.clone();
let s_copy = s.clone();
let local_socket = LocalSocket::new(&s);
socket_tasks.push(tokio::spawn(async move {
loop {
let mut buf = self_copy.node().get_packet_buffer();
let now = ms_monotonic();
if let Ok((bytes, from_sockaddr)) = s_copy.receive(unsafe { buf.entire_buffer_mut() }, now).await {
unsafe { buf.set_size_unchecked(bytes) };
self_copy
.node()
.handle_incoming_physical_packet(
&*self_copy,
&*self_copy.inner,
&Endpoint::IpUdp(InetAddress::from(from_sockaddr)),
&local_socket,
&s_copy.interface,
buf,
)
.await;
}
}
}));
}
debug_assert!(s.associated_tasks.lock().is_empty());
*s.associated_tasks.lock() = socket_tasks;
}
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
async fn node_background_task_daemon(self: Arc<Self>) {
loop {
tokio::time::sleep(self.node().do_background_tasks(self.as_ref()).await).await;
}
}
}
#[async_trait]
@ -112,11 +238,12 @@ impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl:
}
}
let udp_sockets_by_port = self.udp_sockets_by_port.read().await;
if !udp_sockets_by_port.is_empty() {
let state = self.state.read().await;
if !state.udp_sockets.is_empty() {
if let Some(specific_interface) = local_interface {
// Send from a specific interface if that interface is specified.
for (_, p) in udp_sockets_by_port.iter() {
for (_, p) in state.udp_sockets.iter() {
let p = p.read();
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
@ -133,7 +260,8 @@ impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl:
} else {
// Otherwise send from one socket on every interface.
let mut sent_on_interfaces = HashSet::with_capacity(4);
for p in udp_sockets_by_port.values() {
for p in state.udp_sockets.values() {
let p = p.read();
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
@ -173,16 +301,12 @@ impl<StorageImpl: Storage, PathFilterImpl: PathFilter<Self>, InnerProtocolImpl:
for VL1Service<StorageImpl, PathFilterImpl, InnerProtocolImpl>
{
fn drop(&mut self) {
for d in self.daemons.lock().drain(..) {
d.abort();
}
// Drop all bound sockets since these can hold circular Arc<> references to 'internal'.
// This shouldn't have to loop much if at all to acquire the lock, but it might if something
// is still completing somewhere in an aborting task.
loop {
if let Ok(mut udp_sockets) = self.udp_sockets_by_port.try_write() {
udp_sockets.clear();
if let Ok(mut state) = self.state.try_write() {
for d in state.daemons.drain(..) {
d.abort();
}
state.udp_sockets.clear();
break;
}
std::thread::sleep(Duration::from_millis(2));