diff --git a/controller/src/database.rs b/controller/src/database.rs index ca4cff1af..7a9a2ff4f 100644 --- a/controller/src/database.rs +++ b/controller/src/database.rs @@ -15,4 +15,6 @@ pub trait Database: Sync + Send + Sized + 'static { async fn list_members(&self, network_id: NetworkId) -> Result, Self::Error>; async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Self::Error>; async fn save_member(&self, obj: &Member) -> Result<(), Self::Error>; + + async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Self::Error>; } diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index cf2c9b80a..87ec73eec 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -1,5 +1,6 @@ use std::error::Error; use std::path::{Path, PathBuf}; +use std::sync::Arc; use async_trait::async_trait; @@ -29,11 +30,11 @@ fn member_path(base: &PathBuf, network_id: NetworkId, member_id: Address) -> Pat } impl FileDatabase { - pub async fn new>(base_path: P) -> Self { + pub async fn new>(base_path: P) -> Arc { let base: PathBuf = base_path.as_ref().into(); let live: PathBuf = base_path.as_ref().join("live"); let _ = fs::create_dir_all(&live).await; - Self { base, live } + Arc::new(Self { base, live }) } async fn merge_with_live(&self, live_path: PathBuf, changes: O) -> O { @@ -153,4 +154,9 @@ impl Database for FileDatabase { .await?; Ok(()) } + + async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Self::Error> { + println!("{}", obj.to_string()); + Ok(()) + } } diff --git a/controller/src/model.rs b/controller/src/model.rs index 73c6e8a4c..fafc8a3e1 100644 --- a/controller/src/model.rs +++ b/controller/src/model.rs @@ -164,8 +164,18 @@ pub enum AuthorizationResult { ApprovedViaToken = 18, } +impl ToString for AuthorizationResult { + fn to_string(&self) -> String { + match self { + Self::Rejected => "rejected", + _ => "", + } + .to_string() + } +} + #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct QueryLogItem { +pub struct RequestLogItem { #[serde(rename = "nwid")] pub network_id: NetworkId, #[serde(rename = "nid")] @@ -185,3 +195,22 @@ pub struct QueryLogItem { #[serde(rename = "r")] pub result: AuthorizationResult, } + +impl ToString for RequestLogItem { + fn to_string(&self) -> String { + format!( + "{} {} {} ts={} v={}.{}.{},{} s={},{} {}", + self.controller_node_id.to_string(), + self.network_id.to_string(), + self.node_id.to_string(), + self.timestamp, + self.version.0, + self.version.1, + self.version.2, + self.version.3, + self.source_remote_endpoint.to_string(), + self.source_hops, + self.result.to_string() + ) + } +} diff --git a/network-hypervisor/src/vl1/address.rs b/network-hypervisor/src/vl1/address.rs index da84c51e0..df6f3f7a8 100644 --- a/network-hypervisor/src/vl1/address.rs +++ b/network-hypervisor/src/vl1/address.rs @@ -8,11 +8,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::error::InvalidFormatError; use crate::protocol::{ADDRESS_RESERVED_PREFIX, ADDRESS_SIZE}; -use crate::util::marshalable::*; -use zerotier_utils::buffer::Buffer; use zerotier_utils::hex; -use zerotier_utils::hex::HEX_CHARS; /// A unique address on the global ZeroTier VL1 network. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -68,27 +65,12 @@ impl From<&Address> for u64 { } } -impl Marshalable for Address { - const MAX_MARSHAL_SIZE: usize = ADDRESS_SIZE; - - #[inline(always)] - fn marshal(&self, buf: &mut Buffer) -> Result<(), MarshalUnmarshalError> { - buf.append_bytes(&self.0.get().to_be_bytes()[8 - ADDRESS_SIZE..]) - .map_err(|_| MarshalUnmarshalError::OutOfBounds) - } - - #[inline(always)] - fn unmarshal(buf: &Buffer, cursor: &mut usize) -> Result { - Self::from_bytes_fixed(buf.read_bytes_fixed(cursor)?).ok_or(MarshalUnmarshalError::InvalidData) - } -} - impl ToString for Address { fn to_string(&self) -> String { let mut v = self.0.get() << 24; let mut s = String::with_capacity(ADDRESS_SIZE * 2); for _ in 0..(ADDRESS_SIZE * 2) { - s.push(HEX_CHARS[(v >> 60) as usize] as char); + s.push(hex::HEX_CHARS[(v >> 60) as usize] as char); v <<= 4; } s diff --git a/network-hypervisor/src/vl1/endpoint.rs b/network-hypervisor/src/vl1/endpoint.rs index c7050c4d9..cdd664157 100644 --- a/network-hypervisor/src/vl1/endpoint.rs +++ b/network-hypervisor/src/vl1/endpoint.rs @@ -211,7 +211,7 @@ impl Marshalable for Endpoint { match type_byte - 16 { TYPE_NIL => Ok(Endpoint::Nil), TYPE_ZEROTIER => { - let zt = Address::unmarshal(buf, cursor)?; + let zt = Address::from_bytes_fixed(buf.read_bytes_fixed(cursor)?).ok_or(MarshalUnmarshalError::InvalidData)?; Ok(Endpoint::ZeroTier( zt, buf.read_bytes_fixed::(cursor)?.clone(), @@ -230,7 +230,7 @@ impl Marshalable for Endpoint { buf.read_bytes(buf.read_varint(cursor)? as usize, cursor)?.to_vec(), )), TYPE_ZEROTIER_ENCAP => { - let zt = Address::unmarshal(buf, cursor)?; + let zt = Address::from_bytes_fixed(buf.read_bytes_fixed(cursor)?).ok_or(MarshalUnmarshalError::InvalidData)?; Ok(Endpoint::ZeroTierEncap(zt, buf.read_bytes_fixed(cursor)?.clone())) } _ => Err(MarshalUnmarshalError::InvalidData), @@ -448,11 +448,8 @@ impl<'de> Deserialize<'de> for Endpoint { #[cfg(test)] mod tests { - use super::{Endpoint, MAX_MARSHAL_SIZE}; + use super::*; use crate::protocol::*; - use crate::util::marshalable::*; - use crate::vl1::address::Address; - use zerotier_utils::buffer::*; fn randstring(len: u8) -> String { (0..len) diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index b6e0192f3..b598b9d0d 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -600,7 +600,7 @@ impl Node { need_whois }; if !need_whois.is_empty() { - self.send_whois(host_system, need_whois.as_slice()); + self.send_whois(host_system, need_whois.as_slice(), time_ticks); } } @@ -615,6 +615,7 @@ impl Node { source_endpoint: &Endpoint, source_local_socket: &HostSystemImpl::LocalSocket, source_local_interface: &HostSystemImpl::LocalInterface, + time_ticks: i64, mut data: PooledPacketBuffer, ) { debug_event!( @@ -642,7 +643,6 @@ impl Node { // Legacy ZeroTier V1 packet handling if let Ok(fragment_header) = data.struct_mut_at::(0) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { - let time_ticks = host_system.time_ticks(); if dest == self.identity.address { let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); @@ -683,14 +683,19 @@ impl Node { &assembled_packet.frags[1..(assembled_packet.have as usize)], ); } else { - /* - self.whois_lookup_queue.query( - self, - host_system, - source, - Some(QueuedPacket::Fragmented(assembled_packet)), - ); - */ + let mut combined_packet = PooledPacketBuffer::naked(PacketBuffer::new()); + let mut ok = combined_packet.append_bytes(frag0.as_bytes()).is_ok(); + for i in 1..assembled_packet.have { + if let Some(f) = assembled_packet.frags[i as usize].as_ref() { + if f.len() > v1::FRAGMENT_HEADER_SIZE { + ok |= + combined_packet.append_bytes(&f.as_bytes()[v1::FRAGMENT_HEADER_SIZE..]).is_ok(); + } + } + } + if ok { + self.whois(host_system, source, Some(combined_packet), time_ticks); + } } } } @@ -709,7 +714,7 @@ impl Node { if let Some(peer) = self.peer(source) { peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]); } else { - self.whois(host_system, source, Some(data)); + self.whois(host_system, source, Some(PooledPacketBuffer::naked(data.clone())), time_ticks); } } } @@ -771,7 +776,9 @@ impl Node { } } - fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option) { + /// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply. + fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option, time_ticks: i64) { + debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string()); { let mut whois_queue = self.whois_queue.lock(); let qi = whois_queue.entry(address).or_default(); @@ -784,11 +791,30 @@ impl Node { qi.retry_count += 1; } } - self.send_whois(host_system, &[address]); + self.send_whois(host_system, &[address], time_ticks); } - fn send_whois(&self, host_system: &HostSystemImpl, addresses: &[Address]) { - if let Some(root) = self.best_root() {} + /// Send a WHOIS query to the current best root. + fn send_whois(&self, host_system: &HostSystemImpl, addresses: &[Address], time_ticks: i64) { + debug_assert!(!addresses.is_empty()); + if !addresses.is_empty() { + if let Some(root) = self.best_root() { + let mut packet = PacketBuffer::new(); + packet.set_size(v1::HEADER_SIZE); + let _ = packet.append_u8(verbs::VL1_WHOIS); + for a in addresses.iter() { + if (packet.len() + ADDRESS_SIZE) > UDP_DEFAULT_MTU { + root.send(host_system, None, self, time_ticks, &mut packet); + packet.clear(); + packet.set_size(v1::HEADER_SIZE); + let _ = packet.append_u8(verbs::VL1_WHOIS); + } else { + let _ = packet.append_bytes_fixed(&a.to_bytes()); + } + } + root.send(host_system, None, self, time_ticks, &mut packet); + } + } } /// Get the current "best" root from among this node's trusted roots. diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index a88a47f80..ca87ead77 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -729,24 +729,35 @@ impl Peer { payload: &PacketBuffer, ) -> PacketHandlerResult { if node.this_node_is_root() || inner.should_communicate_with(&self.identity) { - let mut packet = PacketBuffer::new(); - packet.set_size(v1::HEADER_SIZE); - { + let init_packet = |packet: &mut PacketBuffer| { + packet.set_size(v1::HEADER_SIZE); let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap(); f.verb = verbs::VL1_OK; f.in_re_verb = verbs::VL1_WHOIS; f.in_re_message_id = message_id.to_ne_bytes(); - } + }; - let mut cursor = 0; - while cursor < payload.len() { - if let Ok(zt_address) = Address::unmarshal(payload, &mut cursor) { - if let Some(peer) = node.peer(zt_address) { - if !packet.append_bytes((&peer.identity.to_public_bytes()).into()).is_ok() { - debug_event!(host_system, "unexpected error serializing an identity into a WHOIS packet response"); - return PacketHandlerResult::Error; + let mut packet = PacketBuffer::new(); + init_packet(&mut packet); + + let mut addresses = payload.as_bytes(); + loop { + if addresses.len() >= ADDRESS_SIZE { + if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) { + if let Some(peer) = node.peer(zt_address) { + let id_bytes_tmp = peer.identity.to_public_bytes(); + let id_bytes = id_bytes_tmp.as_bytes(); + if (packet.capacity() - packet.len()) < id_bytes.len() { + self.send(host_system, None, node, time_ticks, &mut packet); + packet.clear(); + init_packet(&mut packet); + } + let _ = packet.append_bytes(id_bytes); } } + addresses = &addresses[ADDRESS_SIZE..]; + } else { + break; } } diff --git a/utils/src/pool.rs b/utils/src/pool.rs index 6cb85f03f..5fb5acf5c 100644 --- a/utils/src/pool.rs +++ b/utils/src/pool.rs @@ -32,6 +32,17 @@ struct PoolEntry> { } impl> Pooled { + /// Create a pooled object wrapper around an object but with no pool to return it to. + /// The object will be freed when this pooled container is dropped. + pub fn naked(o: O) -> Self { + unsafe { + Self(NonNull::new_unchecked(Box::into_raw(Box::new(PoolEntry:: { + obj: o, + return_pool: Weak::new(), + })))) + } + } + /// Get a raw pointer to the object wrapped by this pooled object container. /// The returned raw pointer MUST be restored into a Pooled instance with /// from_raw() or memory will leak. diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index e60700792..61ba4d5f7 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -196,7 +196,7 @@ impl, - _time_ticks: i64, + time_ticks: i64, socket: &Arc, source_address: &InetAddress, packet: zerotier_network_hypervisor::protocol::PooledPacketBuffer, @@ -207,6 +207,7 @@ impl