Finish WHOIS, blacklist bridge on macOS because the OS has a ton of weird bridge devices, and fix a mutex problem.

This commit is contained in:
Adam Ierymenko 2022-09-23 16:43:00 -04:00
parent 8592cd59e2
commit eaacb26187
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
6 changed files with 91 additions and 26 deletions

View file

@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::hash::Hash;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
@ -41,6 +41,9 @@ pub trait HostSystem: Sync + Send + 'static {
/// A VL1 level event occurred.
fn event(&self, event: Event);
/// Get a pooled packet buffer for internal use.
fn get_buffer(&self) -> PooledPacketBuffer;
/// Check a local socket for validity.
///
/// This could return false if the socket's interface no longer exists, its port has been
@ -197,9 +200,8 @@ struct BackgroundTaskIntervals {
whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
}
#[derive(Default)]
struct WhoisQueueItem {
waiting_packets: RingBuffer<PooledPacketBuffer, WHOIS_MAX_WAITING_PACKETS>,
struct WhoisQueueItem<HostSystemImpl: HostSystem> {
waiting_packets: RingBuffer<(Weak<Path<HostSystemImpl>>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>,
retry_count: u16,
}
@ -230,7 +232,7 @@ pub struct Node<HostSystemImpl: HostSystem> {
best_root: RwLock<Option<Arc<Peer<HostSystemImpl>>>>,
/// Queue of identities being looked up.
whois_queue: Mutex<HashMap<Address, WhoisQueueItem>>,
whois_queue: Mutex<HashMap<Address, WhoisQueueItem<HostSystemImpl>>>,
}
impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
@ -644,6 +646,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if let Ok(fragment_header) = data.struct_mut_at::<v1::FragmentHeader>(0) {
if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) {
if dest == self.identity.address {
let fragment_header = &*fragment_header; // discard mut
let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks);
path.log_receive_anything(time_ticks);
@ -683,7 +686,9 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
&assembled_packet.frags[1..(assembled_packet.have as usize)],
);
} else {
let mut combined_packet = PooledPacketBuffer::naked(PacketBuffer::new());
// If WHOIS is needed we need to go ahead and combine the packet so it can be cached
// for later processing when a WHOIS reply comes back.
let mut combined_packet = host_system.get_buffer();
let mut ok = combined_packet.append_bytes(frag0.as_bytes()).is_ok();
for i in 1..assembled_packet.have {
if let Some(f) = assembled_packet.frags[i as usize].as_ref() {
@ -694,7 +699,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
if ok {
self.whois(host_system, source, Some(combined_packet), time_ticks);
self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks);
}
}
}
@ -702,7 +707,6 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
} else {
#[cfg(debug_assertions)]
if let Ok(packet_header) = data.struct_at::<v1::PacketHeader>(0) {
debug_event!(
host_system,
@ -714,7 +718,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if let Some(peer) = self.peer(source) {
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]);
} else {
self.whois(host_system, source, Some(PooledPacketBuffer::naked(data.clone())), time_ticks);
self.whois(host_system, source, Some((Arc::downgrade(&path), data)), time_ticks);
}
}
}
@ -777,11 +781,19 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
/// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply.
fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option<PooledPacketBuffer>, time_ticks: i64) {
fn whois(
&self,
host_system: &HostSystemImpl,
address: Address,
waiting_packet: Option<(Weak<Path<HostSystemImpl>>, PooledPacketBuffer)>,
time_ticks: i64,
) {
debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string());
{
let mut whois_queue = self.whois_queue.lock();
let qi = whois_queue.entry(address).or_default();
let qi = whois_queue
.entry(address)
.or_insert_with(|| WhoisQueueItem::<HostSystemImpl> { waiting_packets: RingBuffer::new(), retry_count: 0 });
if let Some(p) = waiting_packet {
qi.waiting_packets.add(p);
}
@ -817,6 +829,43 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
/// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS).
pub(crate) fn handle_incoming_identity<InnerProtocolImpl: InnerProtocol>(
&self,
host_system: &HostSystemImpl,
inner: &InnerProtocolImpl,
received_identity: Identity,
time_ticks: i64,
authoritative: bool,
) {
if authoritative {
if received_identity.validate_identity() {
let mut whois_queue = self.whois_queue.lock();
if let Some(qi) = whois_queue.get_mut(&received_identity.address) {
let address = received_identity.address;
if inner.should_communicate_with(&received_identity) {
let mut peers = self.peers.write();
if let Some(peer) = peers.get(&address).cloned().or_else(|| {
Peer::new(&self.identity, received_identity, time_ticks)
.map(|p| Arc::new(p))
.and_then(|peer| Some(peers.entry(address).or_insert(peer).clone()))
}) {
drop(peers);
for p in qi.waiting_packets.iter() {
if let Some(path) = p.0.upgrade() {
if let Ok(packet_header) = p.1.struct_at::<v1::PacketHeader>(0) {
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]);
}
}
}
}
}
whois_queue.remove(&address);
}
}
}
}
/// Get the current "best" root from among this node's trusted roots.
pub fn best_root(&self) -> Option<Arc<Peer<HostSystemImpl>>> {
self.best_root.read().clone()
@ -891,9 +940,11 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
local_interface: &HostSystemImpl::LocalInterface,
time_ticks: i64,
) -> Arc<Path<HostSystemImpl>> {
if let Some(path) = self.paths.read().get(&PathKey::Ref(ep, local_socket)) {
let paths = self.paths.read();
if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) {
path.clone()
} else {
drop(paths);
self.paths
.write()
.entry(PathKey::Copied(ep.clone(), local_socket.clone()))

View file

@ -135,7 +135,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
return None;
}
pub(crate) fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc<Path<HostSystemImpl>>, time_ticks: i64) {
fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc<Path<HostSystemImpl>>, time_ticks: i64) {
let mut paths = self.paths.lock();
match &new_path.endpoint {
@ -259,8 +259,6 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
///
/// This will go directly if there is an active path, or otherwise indirectly
/// via a root or some other route.
///
/// It encrypts and sets the MAC and cipher fields and packet ID and other things.
pub(crate) fn send(
&self,
host_system: &HostSystemImpl,
@ -663,10 +661,11 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX {
match ok_header.in_re_verb {
verbs::VL1_HELLO => {
if let Ok(ok_hello_fixed_header_fields) =
if let Ok(_ok_hello_fixed_header_fields) =
payload.read_struct::<v1::message_component_structs::OkHelloFixedHeaderFields>(&mut cursor)
{
if hops == 0 {
debug_event!(host_system, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),);
if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) {
#[cfg(debug_assertions)]
let reported_endpoint2 = reported_endpoint.clone();
@ -699,8 +698,14 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
verbs::VL1_WHOIS => {
if node.is_peer_root(self) {
while cursor < payload.len() {
if let Ok(_whois_response) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) {
// TODO
if let Ok(received_identity) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) {
debug_event!(
host_system,
"[vl1] {} OK(WHOIS): {}",
self.identity.address.to_string(),
received_identity.to_string()
);
node.handle_incoming_identity(host_system, inner, received_identity, time_ticks, true);
} else {
break;
}

View file

@ -52,7 +52,7 @@ impl Hash for LocalSocket {
impl ToString for LocalSocket {
fn to_string(&self) -> String {
if let Some(s) = self.0.upgrade() {
s.address.to_string()
s.bind_address.to_string()
} else {
"(closed socket)".into()
}

View file

@ -24,7 +24,8 @@ pub struct VL1Settings {
impl VL1Settings {
#[cfg(target_os = "macos")]
pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 10] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt", "llw", "anpi"];
pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 11] =
["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt", "llw", "anpi", "bridge"];
#[cfg(target_os = "linux")]
pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"];

View file

@ -21,6 +21,9 @@ use zerotier_utils::ms_monotonic;
use crate::sys::{getifaddrs, ipv6};
/// UDP socket receive timeout to allow sockets to close properly on some systems (seconds).
const SOCKET_RECV_TIMEOUT_SECONDS: i64 = 2;
fn socket_read_concurrency() -> usize {
const MAX_PER_SOCKET_CONCURRENCY: usize = 8;
@ -60,7 +63,7 @@ pub struct BoundUdpPort {
/// A socket bound to a specific interface and IP.
pub struct BoundUdpSocket {
pub address: InetAddress,
pub bind_address: InetAddress,
pub interface: LocalInterface,
last_receive_time: AtomicI64,
fd: i32,
@ -92,7 +95,7 @@ impl BoundUdpSocket {
#[cfg(unix)]
pub fn send(&self, dest: &InetAddress, data: &[u8], packet_ttl: u8) -> bool {
if dest.family() == self.address.family() {
if dest.family() == self.bind_address.family() {
let (c_sockaddr, c_addrlen) = dest.c_sockaddr();
if packet_ttl == 0 || !dest.is_ipv4() {
unsafe {
@ -183,7 +186,7 @@ impl BoundUdpPort {
existing_bindings
.entry(s.interface)
.or_insert_with(|| HashMap::with_capacity(4))
.insert(s.address.clone(), s);
.insert(s.bind_address.clone(), s);
}
let mut errors: Vec<(LocalInterface, InetAddress, std::io::Error)> = Vec::new();
@ -215,7 +218,7 @@ impl BoundUdpPort {
let fd = s.unwrap();
if s.is_ok() {
let s = Arc::new(BoundUdpSocket {
address: addr_with_port,
bind_address: addr_with_port,
interface: interface.clone(),
last_receive_time: AtomicI64::new(i64::MIN),
fd,
@ -324,7 +327,7 @@ unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result
//assert_ne!(libc::fcntl(s, libc::F_SETFL, libc::O_NONBLOCK), -1);
let mut timeo: libc::timeval = std::mem::zeroed();
timeo.tv_sec = 1;
timeo.tv_sec = SOCKET_RECV_TIMEOUT_SECONDS.as_();
timeo.tv_usec = 0;
setsockopt_results |= libc::setsockopt(
s,

View file

@ -226,11 +226,16 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
}
}
#[inline(always)]
#[inline]
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool {
socket.is_valid()
}
#[inline]
fn get_buffer(&self) -> zerotier_network_hypervisor::protocol::PooledPacketBuffer {
self.buffer_pool.get()
}
fn wire_send(
&self,
endpoint: &Endpoint,