Controller stuff and implementing WHOIS

This commit is contained in:
Adam Ierymenko 2022-09-23 09:56:38 -04:00
parent 373adb028d
commit 8592cd59e2
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
9 changed files with 120 additions and 55 deletions

View file

@ -15,4 +15,6 @@ pub trait Database: Sync + Send + Sized + 'static {
async fn list_members(&self, network_id: NetworkId) -> Result<Vec<Address>, Self::Error>;
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Self::Error>;
async fn save_member(&self, obj: &Member) -> Result<(), Self::Error>;
async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Self::Error>;
}

View file

@ -1,5 +1,6 @@
use std::error::Error;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
@ -29,11 +30,11 @@ fn member_path(base: &PathBuf, network_id: NetworkId, member_id: Address) -> Pat
}
impl FileDatabase {
pub async fn new<P: AsRef<Path>>(base_path: P) -> Self {
pub async fn new<P: AsRef<Path>>(base_path: P) -> Arc<Self> {
let base: PathBuf = base_path.as_ref().into();
let live: PathBuf = base_path.as_ref().join("live");
let _ = fs::create_dir_all(&live).await;
Self { base, live }
Arc::new(Self { base, live })
}
async fn merge_with_live<O: Serialize + DeserializeOwned>(&self, live_path: PathBuf, changes: O) -> O {
@ -153,4 +154,9 @@ impl Database for FileDatabase {
.await?;
Ok(())
}
async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Self::Error> {
println!("{}", obj.to_string());
Ok(())
}
}

View file

@ -164,8 +164,18 @@ pub enum AuthorizationResult {
ApprovedViaToken = 18,
}
impl ToString for AuthorizationResult {
fn to_string(&self) -> String {
match self {
Self::Rejected => "rejected",
_ => "",
}
.to_string()
}
}
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct QueryLogItem {
pub struct RequestLogItem {
#[serde(rename = "nwid")]
pub network_id: NetworkId,
#[serde(rename = "nid")]
@ -185,3 +195,22 @@ pub struct QueryLogItem {
#[serde(rename = "r")]
pub result: AuthorizationResult,
}
impl ToString for RequestLogItem {
fn to_string(&self) -> String {
format!(
"{} {} {} ts={} v={}.{}.{},{} s={},{} {}",
self.controller_node_id.to_string(),
self.network_id.to_string(),
self.node_id.to_string(),
self.timestamp,
self.version.0,
self.version.1,
self.version.2,
self.version.3,
self.source_remote_endpoint.to_string(),
self.source_hops,
self.result.to_string()
)
}
}

View file

@ -8,11 +8,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::error::InvalidFormatError;
use crate::protocol::{ADDRESS_RESERVED_PREFIX, ADDRESS_SIZE};
use crate::util::marshalable::*;
use zerotier_utils::buffer::Buffer;
use zerotier_utils::hex;
use zerotier_utils::hex::HEX_CHARS;
/// A unique address on the global ZeroTier VL1 network.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
@ -68,27 +65,12 @@ impl From<&Address> for u64 {
}
}
impl Marshalable for Address {
const MAX_MARSHAL_SIZE: usize = ADDRESS_SIZE;
#[inline(always)]
fn marshal<const BL: usize>(&self, buf: &mut Buffer<BL>) -> Result<(), MarshalUnmarshalError> {
buf.append_bytes(&self.0.get().to_be_bytes()[8 - ADDRESS_SIZE..])
.map_err(|_| MarshalUnmarshalError::OutOfBounds)
}
#[inline(always)]
fn unmarshal<const BL: usize>(buf: &Buffer<BL>, cursor: &mut usize) -> Result<Self, MarshalUnmarshalError> {
Self::from_bytes_fixed(buf.read_bytes_fixed(cursor)?).ok_or(MarshalUnmarshalError::InvalidData)
}
}
impl ToString for Address {
fn to_string(&self) -> String {
let mut v = self.0.get() << 24;
let mut s = String::with_capacity(ADDRESS_SIZE * 2);
for _ in 0..(ADDRESS_SIZE * 2) {
s.push(HEX_CHARS[(v >> 60) as usize] as char);
s.push(hex::HEX_CHARS[(v >> 60) as usize] as char);
v <<= 4;
}
s

View file

@ -211,7 +211,7 @@ impl Marshalable for Endpoint {
match type_byte - 16 {
TYPE_NIL => Ok(Endpoint::Nil),
TYPE_ZEROTIER => {
let zt = Address::unmarshal(buf, cursor)?;
let zt = Address::from_bytes_fixed(buf.read_bytes_fixed(cursor)?).ok_or(MarshalUnmarshalError::InvalidData)?;
Ok(Endpoint::ZeroTier(
zt,
buf.read_bytes_fixed::<IDENTITY_FINGERPRINT_SIZE>(cursor)?.clone(),
@ -230,7 +230,7 @@ impl Marshalable for Endpoint {
buf.read_bytes(buf.read_varint(cursor)? as usize, cursor)?.to_vec(),
)),
TYPE_ZEROTIER_ENCAP => {
let zt = Address::unmarshal(buf, cursor)?;
let zt = Address::from_bytes_fixed(buf.read_bytes_fixed(cursor)?).ok_or(MarshalUnmarshalError::InvalidData)?;
Ok(Endpoint::ZeroTierEncap(zt, buf.read_bytes_fixed(cursor)?.clone()))
}
_ => Err(MarshalUnmarshalError::InvalidData),
@ -448,11 +448,8 @@ impl<'de> Deserialize<'de> for Endpoint {
#[cfg(test)]
mod tests {
use super::{Endpoint, MAX_MARSHAL_SIZE};
use super::*;
use crate::protocol::*;
use crate::util::marshalable::*;
use crate::vl1::address::Address;
use zerotier_utils::buffer::*;
fn randstring(len: u8) -> String {
(0..len)

View file

@ -600,7 +600,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
need_whois
};
if !need_whois.is_empty() {
self.send_whois(host_system, need_whois.as_slice());
self.send_whois(host_system, need_whois.as_slice(), time_ticks);
}
}
@ -615,6 +615,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
source_endpoint: &Endpoint,
source_local_socket: &HostSystemImpl::LocalSocket,
source_local_interface: &HostSystemImpl::LocalInterface,
time_ticks: i64,
mut data: PooledPacketBuffer,
) {
debug_event!(
@ -642,7 +643,6 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
// Legacy ZeroTier V1 packet handling
if let Ok(fragment_header) = data.struct_mut_at::<v1::FragmentHeader>(0) {
if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) {
let time_ticks = host_system.time_ticks();
if dest == self.identity.address {
let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks);
path.log_receive_anything(time_ticks);
@ -683,14 +683,19 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
&assembled_packet.frags[1..(assembled_packet.have as usize)],
);
} else {
/*
self.whois_lookup_queue.query(
self,
host_system,
source,
Some(QueuedPacket::Fragmented(assembled_packet)),
);
*/
let mut combined_packet = PooledPacketBuffer::naked(PacketBuffer::new());
let mut ok = combined_packet.append_bytes(frag0.as_bytes()).is_ok();
for i in 1..assembled_packet.have {
if let Some(f) = assembled_packet.frags[i as usize].as_ref() {
if f.len() > v1::FRAGMENT_HEADER_SIZE {
ok |=
combined_packet.append_bytes(&f.as_bytes()[v1::FRAGMENT_HEADER_SIZE..]).is_ok();
}
}
}
if ok {
self.whois(host_system, source, Some(combined_packet), time_ticks);
}
}
}
}
@ -709,7 +714,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if let Some(peer) = self.peer(source) {
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]);
} else {
self.whois(host_system, source, Some(data));
self.whois(host_system, source, Some(PooledPacketBuffer::naked(data.clone())), time_ticks);
}
}
}
@ -771,7 +776,9 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option<PooledPacketBuffer>) {
/// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply.
fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option<PooledPacketBuffer>, time_ticks: i64) {
debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string());
{
let mut whois_queue = self.whois_queue.lock();
let qi = whois_queue.entry(address).or_default();
@ -784,11 +791,30 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
qi.retry_count += 1;
}
}
self.send_whois(host_system, &[address]);
self.send_whois(host_system, &[address], time_ticks);
}
fn send_whois(&self, host_system: &HostSystemImpl, addresses: &[Address]) {
if let Some(root) = self.best_root() {}
/// Send a WHOIS query to the current best root.
fn send_whois(&self, host_system: &HostSystemImpl, addresses: &[Address], time_ticks: i64) {
debug_assert!(!addresses.is_empty());
if !addresses.is_empty() {
if let Some(root) = self.best_root() {
let mut packet = PacketBuffer::new();
packet.set_size(v1::HEADER_SIZE);
let _ = packet.append_u8(verbs::VL1_WHOIS);
for a in addresses.iter() {
if (packet.len() + ADDRESS_SIZE) > UDP_DEFAULT_MTU {
root.send(host_system, None, self, time_ticks, &mut packet);
packet.clear();
packet.set_size(v1::HEADER_SIZE);
let _ = packet.append_u8(verbs::VL1_WHOIS);
} else {
let _ = packet.append_bytes_fixed(&a.to_bytes());
}
}
root.send(host_system, None, self, time_ticks, &mut packet);
}
}
}
/// Get the current "best" root from among this node's trusted roots.

View file

@ -729,24 +729,35 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
payload: &PacketBuffer,
) -> PacketHandlerResult {
if node.this_node_is_root() || inner.should_communicate_with(&self.identity) {
let mut packet = PacketBuffer::new();
packet.set_size(v1::HEADER_SIZE);
{
let init_packet = |packet: &mut PacketBuffer| {
packet.set_size(v1::HEADER_SIZE);
let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap();
f.verb = verbs::VL1_OK;
f.in_re_verb = verbs::VL1_WHOIS;
f.in_re_message_id = message_id.to_ne_bytes();
}
};
let mut cursor = 0;
while cursor < payload.len() {
if let Ok(zt_address) = Address::unmarshal(payload, &mut cursor) {
if let Some(peer) = node.peer(zt_address) {
if !packet.append_bytes((&peer.identity.to_public_bytes()).into()).is_ok() {
debug_event!(host_system, "unexpected error serializing an identity into a WHOIS packet response");
return PacketHandlerResult::Error;
let mut packet = PacketBuffer::new();
init_packet(&mut packet);
let mut addresses = payload.as_bytes();
loop {
if addresses.len() >= ADDRESS_SIZE {
if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) {
if let Some(peer) = node.peer(zt_address) {
let id_bytes_tmp = peer.identity.to_public_bytes();
let id_bytes = id_bytes_tmp.as_bytes();
if (packet.capacity() - packet.len()) < id_bytes.len() {
self.send(host_system, None, node, time_ticks, &mut packet);
packet.clear();
init_packet(&mut packet);
}
let _ = packet.append_bytes(id_bytes);
}
}
addresses = &addresses[ADDRESS_SIZE..];
} else {
break;
}
}

View file

@ -32,6 +32,17 @@ struct PoolEntry<O, F: PoolFactory<O>> {
}
impl<O, F: PoolFactory<O>> Pooled<O, F> {
/// Create a pooled object wrapper around an object but with no pool to return it to.
/// The object will be freed when this pooled container is dropped.
pub fn naked(o: O) -> Self {
unsafe {
Self(NonNull::new_unchecked(Box::into_raw(Box::new(PoolEntry::<O, F> {
obj: o,
return_pool: Weak::new(),
}))))
}
}
/// Get a raw pointer to the object wrapped by this pooled object container.
/// The returned raw pointer MUST be restored into a Pooled instance with
/// from_raw() or memory will leak.

View file

@ -196,7 +196,7 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
#[inline(always)]
fn incoming_udp_packet(
self: &Arc<Self>,
_time_ticks: i64,
time_ticks: i64,
socket: &Arc<crate::sys::udp::BoundUdpSocket>,
source_address: &InetAddress,
packet: zerotier_network_hypervisor::protocol::PooledPacketBuffer,
@ -207,6 +207,7 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
&Endpoint::IpUdp(source_address.clone()),
&LocalSocket::new(socket),
&socket.interface,
time_ticks,
packet,
);
}