From b4c74ce7bbae9211e7ab81a7c3d5cc6fff6ab8dd Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 16 Sep 2022 13:50:39 -0400 Subject: [PATCH] More work on VL1 service --- service/src/localconfig.rs | 15 --- vl1-service/Cargo.toml | 1 + vl1-service/src/constants.rs | 16 +++ vl1-service/src/lib.rs | 3 + vl1-service/src/settings.rs | 46 +++++++++ vl1-service/src/sys/udp.rs | 74 +++++++++----- vl1-service/src/vl1service.rs | 178 ++++++++++++++++++++++++++++------ 7 files changed, 265 insertions(+), 68 deletions(-) create mode 100644 vl1-service/src/constants.rs create mode 100644 vl1-service/src/settings.rs diff --git a/service/src/localconfig.rs b/service/src/localconfig.rs index df6fbff72..c2720ad83 100644 --- a/service/src/localconfig.rs +++ b/service/src/localconfig.rs @@ -7,21 +7,6 @@ use serde::{Deserialize, Serialize}; use zerotier_network_hypervisor::vl1::{Address, Endpoint, InetAddress}; use zerotier_network_hypervisor::vl2::NetworkId; -/// A list of unassigned or obsolete ports under 1024 that could possibly be squatted. -pub const UNASSIGNED_PRIVILEGED_PORTS: [u16; 299] = [ - 4, 6, 8, 10, 12, 14, 15, 16, 26, 28, 30, 32, 34, 36, 40, 60, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 285, 288, 289, 290, - 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, - 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 703, 708, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727, - 728, 732, 733, 734, 735, 736, 737, 738, 739, 740, 743, 745, 746, 755, 756, 766, 768, 778, 779, 781, 782, 783, 784, 785, 786, 787, 788, - 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 802, 803, 804, 805, 806, 807, 808, 809, 811, 812, 813, 814, 815, 816, 817, 818, - 819, 820, 821, 822, 823, 824, 825, 826, 827, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 849, 850, 851, 852, 853, - 854, 855, 856, 857, 858, 859, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, - 884, 885, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 904, 905, 906, 907, 908, 909, 910, 911, 914, 915, 916, 917, 918, 919, - 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, - 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, - 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1023, -]; - /// Default primary ZeroTier port. pub const DEFAULT_PORT: u16 = 9993; diff --git a/vl1-service/Cargo.toml b/vl1-service/Cargo.toml index c1325306a..6cd63a365 100644 --- a/vl1-service/Cargo.toml +++ b/vl1-service/Cargo.toml @@ -13,6 +13,7 @@ async-trait = "^0" num-traits = "^0" tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false } parking_lot = { version = "^0", features = [], default-features = false } +serde = { version = "^1", features = ["derive"], default-features = false } [target."cfg(windows)".dependencies] winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } diff --git a/vl1-service/src/constants.rs b/vl1-service/src/constants.rs new file mode 100644 index 000000000..69592b0d0 --- /dev/null +++ b/vl1-service/src/constants.rs @@ -0,0 +1,16 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +/// A list of unassigned or obsolete ports under 1024 that could possibly be squatted. +pub const UNASSIGNED_PRIVILEGED_PORTS: [u16; 299] = [ + 4, 6, 8, 10, 12, 14, 15, 16, 26, 28, 30, 32, 34, 36, 40, 60, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 285, 288, 289, 290, + 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, + 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 703, 708, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727, + 728, 732, 733, 734, 735, 736, 737, 738, 739, 740, 743, 745, 746, 755, 756, 766, 768, 778, 779, 781, 782, 783, 784, 785, 786, 787, 788, + 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 802, 803, 804, 805, 806, 807, 808, 809, 811, 812, 813, 814, 815, 816, 817, 818, + 819, 820, 821, 822, 823, 824, 825, 826, 827, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 849, 850, 851, 852, 853, + 854, 855, 856, 857, 858, 859, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, + 884, 885, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 904, 905, 906, 907, 908, 909, 910, 911, 914, 915, 916, 917, 918, 919, + 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, + 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, + 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1023, +]; diff --git a/vl1-service/src/lib.rs b/vl1-service/src/lib.rs index 6d97e5415..98085f4f7 100644 --- a/vl1-service/src/lib.rs +++ b/vl1-service/src/lib.rs @@ -2,10 +2,13 @@ mod localinterface; mod localsocket; +mod settings; mod vl1service; +pub mod constants; pub mod sys; pub use localinterface::LocalInterface; pub use localsocket::LocalSocket; +pub use settings::Settings; pub use vl1service::*; diff --git a/vl1-service/src/settings.rs b/vl1-service/src/settings.rs new file mode 100644 index 000000000..ebc58657a --- /dev/null +++ b/vl1-service/src/settings.rs @@ -0,0 +1,46 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use serde::{Deserialize, Serialize}; +use zerotier_network_hypervisor::vl1::InetAddress; + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct Settings { + /// Primary ZeroTier port that is always bound, default is 9993. + pub fixed_ports: Vec, + + /// Number of additional random ports to bind. + pub random_port_count: usize, + + /// Enable uPnP, NAT-PMP, and other router port mapping technologies? + pub port_mapping: bool, + + /// Interface name prefix blacklist for local bindings (not remote IPs). + pub interface_prefix_blacklist: Vec, + + /// IP/bits CIDR blacklist for local bindings (not remote IPs). + pub cidr_blacklist: Vec, +} + +impl Settings { + #[cfg(target_os = "macos")] + pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 10] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt", "llw", "anpi"]; + + #[cfg(target_os = "linux")] + pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"]; + + #[cfg(windows)] + pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 0] = []; +} + +impl Default for Settings { + fn default() -> Self { + Self { + fixed_ports: vec![9993], + random_port_count: 5, + port_mapping: true, + interface_prefix_blacklist: Self::DEFAULT_PREFIX_BLACKLIST.iter().map(|s| s.to_string()).collect(), + cidr_blacklist: Vec::new(), + } + } +} diff --git a/vl1-service/src/sys/udp.rs b/vl1-service/src/sys/udp.rs index ad3519348..7a0ccbd2a 100644 --- a/vl1-service/src/sys/udp.rs +++ b/vl1-service/src/sys/udp.rs @@ -4,9 +4,10 @@ use std::collections::HashMap; #[allow(unused_imports)] use std::mem::{size_of, transmute, MaybeUninit}; #[allow(unused_imports)] -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; #[allow(unused_imports)] use std::ptr::{null, null_mut}; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; #[cfg(unix)] @@ -35,9 +36,20 @@ pub struct BoundUdpSocket { pub address: InetAddress, pub socket: Arc, pub interface: LocalInterface, + pub associated_tasks: parking_lot::Mutex>>, + last_receive_time: AtomicI64, fd: RawFd, } +impl Drop for BoundUdpSocket { + fn drop(&mut self) { + let mut associated_tasks = self.associated_tasks.lock(); + for t in associated_tasks.drain(..) { + t.abort(); + } + } +} + impl BoundUdpSocket { #[cfg(unix)] #[inline(always)] @@ -54,31 +66,6 @@ impl BoundUdpSocket { }; } - #[cfg(any(target_os = "macos", target_os = "freebsd"))] - pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool { - let mut ok = false; - if dest.family() == self.address.family() { - if packet_ttl > 0 && dest.is_ipv4() { - self.set_ttl(packet_ttl); - } - unsafe { - ok = libc::sendto( - self.fd.as_(), - b.as_ptr().cast(), - b.len().as_(), - 0, - (dest as *const InetAddress).cast(), - size_of::().as_(), - ) > 0; - } - if packet_ttl > 0 && dest.is_ipv4() { - self.set_ttl(0xff); - } - } - ok - } - - #[cfg(not(any(target_os = "macos", target_os = "freebsd")))] pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool { let mut ok = false; if dest.family() == self.address.family() { @@ -92,6 +79,14 @@ impl BoundUdpSocket { } ok } + + pub async fn receive + Send>(&self, mut buffer: B, current_time: i64) -> tokio::io::Result<(usize, SocketAddr)> { + let result = self.socket.recv_from(buffer.as_mut()).await; + if result.is_ok() { + self.last_receive_time.store(current_time, Ordering::Relaxed); + } + result + } } impl BoundUdpPort { @@ -102,6 +97,17 @@ impl BoundUdpPort { Self { sockets: Vec::new(), port } } + /// Return a tuple of: total number of Arc<>+Weak<> references to sockets, and most recent receive time on any socket. + pub fn liveness(&self) -> (usize, i64) { + let mut rt_latest = i64::MIN; + let mut total_handles = 0; + for s in self.sockets.iter() { + rt_latest = rt_latest.max(s.last_receive_time.load(Ordering::Relaxed)); + total_handles += Arc::strong_count(s) + Arc::weak_count(s); + } + (total_handles, rt_latest) + } + /// Synchronize bindings with devices and IPs in system. /// /// Any device or local IP within any of the supplied blacklists is ignored. Multicast or loopback addresses are @@ -129,6 +135,7 @@ impl BoundUdpPort { let interface_str = interface.to_string(); let mut addr_with_port = address.clone(); addr_with_port.set_port(self.port); + if address.is_ip() && matches!( address.scope(), @@ -145,6 +152,7 @@ impl BoundUdpPort { self.sockets.push(socket.clone()); } } + if !found { let s = unsafe { bind_udp_to_device(interface_str.as_str(), &addr_with_port) }; if s.is_ok() { @@ -155,6 +163,7 @@ impl BoundUdpPort { address: addr_with_port, socket: Arc::new(s.unwrap()), interface: interface.clone(), + last_receive_time: AtomicI64::new(i64::MIN), fd, }); self.sockets.push(s.clone()); @@ -177,6 +186,19 @@ impl BoundUdpPort { } } +/// Attempt to bind universally to a given UDP port and then close to determine if we can use it. +/// +/// This succeeds if either IPv4 or IPv6 global can be bound. +pub fn udp_test_bind(port: u16) -> bool { + std::net::UdpSocket::bind( + &[ + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port), + SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port), + ][..], + ) + .is_ok() +} + #[allow(unused_variables)] #[cfg(unix)] unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result { diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 9930772ee..49cc779a9 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -7,10 +7,13 @@ use std::sync::Arc; use async_trait::async_trait; use zerotier_crypto::random; -use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InnerProtocol, Node, PathFilter, Storage}; +use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InetAddress, InnerProtocol, Node, PathFilter, Storage}; use zerotier_utils::{ms_monotonic, ms_since_epoch}; -use crate::sys::udp::BoundUdpPort; +use crate::constants::UNASSIGNED_PRIVILEGED_PORTS; +use crate::settings::Settings; +use crate::sys::udp::{udp_test_bind, BoundUdpPort}; +use crate::LocalSocket; use tokio::task::JoinHandle; use tokio::time::Duration; @@ -22,14 +25,19 @@ use tokio::time::Duration; /// whatever inner protocol implementation is using it. This would typically be VL2 but could be /// a test harness or just the controller for a controller that runs stand-alone. pub struct VL1Service, InnerProtocolImpl: InnerProtocol> { - daemons: parking_lot::Mutex>>, - udp_sockets_by_port: tokio::sync::RwLock>, + state: tokio::sync::RwLock, storage: Arc, inner: Arc, path_filter: Arc, node_container: Option>, } +struct VL1ServiceMutableState { + daemons: Vec>, + udp_sockets: HashMap>, + settings: Settings, +} + impl, InnerProtocolImpl: InnerProtocol> VL1Service { @@ -37,26 +45,28 @@ impl, InnerProtocolImpl: storage: Arc, inner: Arc, path_filter: Arc, + settings: Settings, ) -> Result, Box> { let mut service = VL1Service { - daemons: parking_lot::Mutex::new(Vec::with_capacity(2)), - udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(8)), + state: tokio::sync::RwLock::new(VL1ServiceMutableState { + daemons: Vec::with_capacity(2), + udp_sockets: HashMap::with_capacity(8), + settings, + }), storage, inner, path_filter, node_container: None, }; - service .node_container .replace(Node::new(&service, &*service.storage, true, false).await?); - let service = Arc::new(service); - let mut daemons = service.daemons.lock(); - daemons.push(tokio::spawn(service.clone().udp_bind_daemon())); - daemons.push(tokio::spawn(service.clone().node_background_task_daemon())); - drop(daemons); + let mut state = service.state.write().await; + state.daemons.push(tokio::spawn(service.clone().udp_bind_daemon())); + state.daemons.push(tokio::spawn(service.clone().node_background_task_daemon())); + drop(state); Ok(service) } @@ -67,9 +77,125 @@ impl, InnerProtocolImpl: unsafe { self.node_container.as_ref().unwrap_unchecked() } } - async fn udp_bind_daemon(self: Arc) {} + pub async fn bound_udp_ports(&self) -> Vec { + self.state.read().await.udp_sockets.keys().cloned().collect() + } - async fn node_background_task_daemon(self: Arc) {} + async fn udp_bind_daemon(self: Arc) { + loop { + let state = self.state.read().await; + let mut need_fixed_ports: HashSet = HashSet::from_iter(state.settings.fixed_ports.iter().cloned()); + let mut have_random_port_count = 0; + for (p, _) in state.udp_sockets.iter() { + need_fixed_ports.remove(p); + have_random_port_count += (!state.settings.fixed_ports.contains(p)) as usize; + } + let desired_random_port_count = state.settings.random_port_count; + + let state = if !need_fixed_ports.is_empty() || have_random_port_count != desired_random_port_count { + drop(state); + let mut state = self.state.write().await; + + for p in need_fixed_ports.iter() { + state.udp_sockets.insert(*p, parking_lot::RwLock::new(BoundUdpPort::new(*p))); + } + + while have_random_port_count > desired_random_port_count { + let mut most_stale_binding_liveness = (usize::MAX, i64::MAX); + let mut most_stale_binding_port = 0; + for (p, s) in state.udp_sockets.iter() { + if !state.settings.fixed_ports.contains(p) { + let (total_smart_ptr_handles, most_recent_receive) = s.read().liveness(); + if total_smart_ptr_handles < most_stale_binding_liveness.0 + || (total_smart_ptr_handles == most_stale_binding_liveness.0 + && most_recent_receive <= most_stale_binding_liveness.1) + { + most_stale_binding_liveness.0 = total_smart_ptr_handles; + most_stale_binding_liveness.1 = most_recent_receive; + most_stale_binding_port = *p; + } + } + } + if most_stale_binding_port != 0 { + have_random_port_count -= state.udp_sockets.remove(&most_stale_binding_port).is_some() as usize; + } else { + break; + } + } + + 'outer_add_port_loop: while have_random_port_count < desired_random_port_count { + let rn = random::xorshift64_random() as usize; + for i in 0..UNASSIGNED_PRIVILEGED_PORTS.len() { + let p = UNASSIGNED_PRIVILEGED_PORTS[rn.wrapping_add(i) % UNASSIGNED_PRIVILEGED_PORTS.len()]; + if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) { + let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p))); + continue 'outer_add_port_loop; + } + } + + let p = 50000 + ((random::xorshift64_random() as u16) % 15535); + if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) { + let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p))); + } + } + + drop(state); + self.state.read().await + } else { + state + }; + + let num_cores = std::thread::available_parallelism().map_or(1, |c| c.get()); + for (_, binding) in state.udp_sockets.iter() { + let mut binding = binding.write(); + let (_, mut new_sockets) = + binding.update_bindings(&state.settings.interface_prefix_blacklist, &state.settings.cidr_blacklist); + for s in new_sockets.drain(..) { + // Start one async task per system core. This is technically not necessary because tokio + // schedules and multiplexes, but this enables tokio to grab and schedule packets + // concurrently for up to the number of cores available for any given socket and is + // probably faster than other patterns that involve iterating through sockets and creating + // arrays of futures or using channels. + let mut socket_tasks = Vec::with_capacity(num_cores); + for _ in 0..num_cores { + let self_copy = self.clone(); + let s_copy = s.clone(); + let local_socket = LocalSocket::new(&s); + socket_tasks.push(tokio::spawn(async move { + loop { + let mut buf = self_copy.node().get_packet_buffer(); + let now = ms_monotonic(); + if let Ok((bytes, from_sockaddr)) = s_copy.receive(unsafe { buf.entire_buffer_mut() }, now).await { + unsafe { buf.set_size_unchecked(bytes) }; + self_copy + .node() + .handle_incoming_physical_packet( + &*self_copy, + &*self_copy.inner, + &Endpoint::IpUdp(InetAddress::from(from_sockaddr)), + &local_socket, + &s_copy.interface, + buf, + ) + .await; + } + } + })); + } + debug_assert!(s.associated_tasks.lock().is_empty()); + *s.associated_tasks.lock() = socket_tasks; + } + } + + tokio::time::sleep(Duration::from_secs(10)).await; + } + } + + async fn node_background_task_daemon(self: Arc) { + loop { + tokio::time::sleep(self.node().do_background_tasks(self.as_ref()).await).await; + } + } } #[async_trait] @@ -112,11 +238,12 @@ impl, InnerProtocolImpl: } } - let udp_sockets_by_port = self.udp_sockets_by_port.read().await; - if !udp_sockets_by_port.is_empty() { + let state = self.state.read().await; + if !state.udp_sockets.is_empty() { if let Some(specific_interface) = local_interface { // Send from a specific interface if that interface is specified. - for (_, p) in udp_sockets_by_port.iter() { + for (_, p) in state.udp_sockets.iter() { + let p = p.read(); if !p.sockets.is_empty() { let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); for _ in 0..p.sockets.len() { @@ -133,7 +260,8 @@ impl, InnerProtocolImpl: } else { // Otherwise send from one socket on every interface. let mut sent_on_interfaces = HashSet::with_capacity(4); - for p in udp_sockets_by_port.values() { + for p in state.udp_sockets.values() { + let p = p.read(); if !p.sockets.is_empty() { let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); for _ in 0..p.sockets.len() { @@ -173,16 +301,12 @@ impl, InnerProtocolImpl: for VL1Service { fn drop(&mut self) { - for d in self.daemons.lock().drain(..) { - d.abort(); - } - - // Drop all bound sockets since these can hold circular Arc<> references to 'internal'. - // This shouldn't have to loop much if at all to acquire the lock, but it might if something - // is still completing somewhere in an aborting task. loop { - if let Ok(mut udp_sockets) = self.udp_sockets_by_port.try_write() { - udp_sockets.clear(); + if let Ok(mut state) = self.state.try_write() { + for d in state.daemons.drain(..) { + d.abort(); + } + state.udp_sockets.clear(); break; } std::thread::sleep(Duration::from_millis(2));