Various cleanup.

This commit is contained in:
Adam Ierymenko 2021-08-05 18:25:04 -04:00
parent 94305ae779
commit 005f76cd9b
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
9 changed files with 60 additions and 33 deletions

View file

@ -9,7 +9,7 @@ lto = true
codegen-units = 1 codegen-units = 1
panic = 'abort' panic = 'abort'
[dependencies] [target."cfg(all(not(any(target_os = \"macos\", target_os = \"ios\")), target_arch = \"s390x\"))".dependencies]
openssl = "^0" openssl = "^0"
[target."cfg(not(any(target_os = \"macos\", target_os = \"ios\")))".dependencies] [target."cfg(not(any(target_os = \"macos\", target_os = \"ios\")))".dependencies]

View file

@ -106,6 +106,9 @@ pub const WHOIS_RETRY_INTERVAL: i64 = 1000;
/// Maximum number of WHOIS retries /// Maximum number of WHOIS retries
pub const WHOIS_RETRY_MAX: u16 = 3; pub const WHOIS_RETRY_MAX: u16 = 3;
/// Maximum number of packets to queue up behind a WHOIS.
pub const WHOIS_MAX_WAITING_PACKETS: usize = 64;
/// Maximum number of endpoints allowed in a Locator. /// Maximum number of endpoints allowed in a Locator.
pub const LOCATOR_MAX_ENDPOINTS: usize = 32; pub const LOCATOR_MAX_ENDPOINTS: usize = 32;

View file

@ -183,6 +183,7 @@ impl Dictionary {
impl ToString for Dictionary { impl ToString for Dictionary {
/// Get the dictionary in an always readable format with non-printable characters replaced by '\xXX'. /// Get the dictionary in an always readable format with non-printable characters replaced by '\xXX'.
/// This is not a serializable output that can be re-imported. Use write_to() for that.
fn to_string(&self) -> String { fn to_string(&self) -> String {
let mut s = String::new(); let mut s = String::new();
for kv in self.0.iter() { for kv in self.0.iter() {

View file

@ -1,9 +1,9 @@
use std::cmp::Ordering;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use crate::vl1::{Address, MAC}; use crate::vl1::{Address, MAC};
use crate::vl1::inetaddress::InetAddress;
use crate::vl1::buffer::Buffer; use crate::vl1::buffer::Buffer;
use std::cmp::Ordering; use crate::vl1::inetaddress::InetAddress;
const TYPE_NIL: u8 = 0; const TYPE_NIL: u8 = 0;
const TYPE_ZEROTIER: u8 = 1; const TYPE_ZEROTIER: u8 = 1;
@ -82,26 +82,26 @@ impl Endpoint {
pub fn marshal<const BL: usize>(&self, buf: &mut Buffer<BL>) -> std::io::Result<()> { pub fn marshal<const BL: usize>(&self, buf: &mut Buffer<BL>) -> std::io::Result<()> {
match self { match self {
Endpoint::Nil => { Endpoint::Nil => {
buf.append_u8(Type::Nil as u8) buf.append_u8(TYPE_NIL)
} }
Endpoint::ZeroTier(a) => { Endpoint::ZeroTier(a) => {
buf.append_u8(16 + (Type::ZeroTier as u8))?; buf.append_u8(16 + TYPE_ZEROTIER)?;
buf.append_bytes_fixed(&a.to_bytes()) buf.append_bytes_fixed(&a.to_bytes())
} }
Endpoint::Ethernet(m) => { Endpoint::Ethernet(m) => {
buf.append_u8(16 + (Type::Ethernet as u8))?; buf.append_u8(16 + TYPE_ETHERNET)?;
buf.append_bytes_fixed(&m.to_bytes()) buf.append_bytes_fixed(&m.to_bytes())
} }
Endpoint::WifiDirect(m) => { Endpoint::WifiDirect(m) => {
buf.append_u8(16 + (Type::WifiDirect as u8))?; buf.append_u8(16 + TYPE_WIFIDIRECT)?;
buf.append_bytes_fixed(&m.to_bytes()) buf.append_bytes_fixed(&m.to_bytes())
} }
Endpoint::Bluetooth(m) => { Endpoint::Bluetooth(m) => {
buf.append_u8(16 + (Type::Bluetooth as u8))?; buf.append_u8(16 + TYPE_BLUETOOTH)?;
buf.append_bytes_fixed(&m.to_bytes()) buf.append_bytes_fixed(&m.to_bytes())
} }
Endpoint::Ip(ip) => { Endpoint::Ip(ip) => {
buf.append_u8(16 + (Type::Ip as u8))?; buf.append_u8(16 + TYPE_IP)?;
ip.marshal(buf) ip.marshal(buf)
} }
Endpoint::IpUdp(ip) => { Endpoint::IpUdp(ip) => {
@ -111,17 +111,17 @@ impl Endpoint {
ip.marshal(buf) ip.marshal(buf)
} }
Endpoint::IpTcp(ip) => { Endpoint::IpTcp(ip) => {
buf.append_u8(16 + (Type::IpTcp as u8))?; buf.append_u8(16 + TYPE_IPTCP)?;
ip.marshal(buf) ip.marshal(buf)
} }
Endpoint::Http(url) => { Endpoint::Http(url) => {
buf.append_u8(16 + (Type::Http as u8))?; buf.append_u8(16 + TYPE_HTTP)?;
let b = url.as_bytes(); let b = url.as_bytes();
buf.append_u16(b.len() as u16)?; buf.append_u16(b.len() as u16)?;
buf.append_bytes(b) buf.append_bytes(b)
} }
Endpoint::WebRTC(offer) => { Endpoint::WebRTC(offer) => {
buf.append_u8(16 + (Type::WebRTC as u8))?; buf.append_u8(16 + TYPE_WEBRTC)?;
let b = offer.as_slice(); let b = offer.as_slice();
buf.append_u16(b.len() as u16)?; buf.append_u16(b.len() as u16)?;
buf.append_bytes(b) buf.append_bytes(b)
@ -166,42 +166,42 @@ impl Hash for Endpoint {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
match self { match self {
Endpoint::Nil => { Endpoint::Nil => {
state.write_u8(Type::Nil as u8); state.write_u8(TYPE_NIL);
} }
Endpoint::ZeroTier(a) => { Endpoint::ZeroTier(a) => {
state.write_u8(Type::ZeroTier as u8); state.write_u8(TYPE_ZEROTIER);
state.write_u64(a.to_u64()) state.write_u64(a.to_u64())
} }
Endpoint::Ethernet(m) => { Endpoint::Ethernet(m) => {
state.write_u8(Type::Ethernet as u8); state.write_u8(TYPE_ETHERNET);
state.write_u64(m.to_u64()) state.write_u64(m.to_u64())
} }
Endpoint::WifiDirect(m) => { Endpoint::WifiDirect(m) => {
state.write_u8(Type::WifiDirect as u8); state.write_u8(TYPE_WIFIDIRECT);
state.write_u64(m.to_u64()) state.write_u64(m.to_u64())
} }
Endpoint::Bluetooth(m) => { Endpoint::Bluetooth(m) => {
state.write_u8(Type::Bluetooth as u8); state.write_u8(TYPE_BLUETOOTH);
state.write_u64(m.to_u64()) state.write_u64(m.to_u64())
} }
Endpoint::Ip(ip) => { Endpoint::Ip(ip) => {
state.write_u8(Type::Ip as u8); state.write_u8(TYPE_IP);
ip.hash(state); ip.hash(state);
} }
Endpoint::IpUdp(ip) => { Endpoint::IpUdp(ip) => {
state.write_u8(Type::IpUdp as u8); state.write_u8(TYPE_IPUDP);
ip.hash(state); ip.hash(state);
} }
Endpoint::IpTcp(ip) => { Endpoint::IpTcp(ip) => {
state.write_u8(Type::IpTcp as u8); state.write_u8(TYPE_IPTCP);
ip.hash(state); ip.hash(state);
} }
Endpoint::Http(url) => { Endpoint::Http(url) => {
state.write_u8(Type::Http as u8); state.write_u8(TYPE_HTTP);
url.hash(state); url.hash(state);
} }
Endpoint::WebRTC(offer) => { Endpoint::WebRTC(offer) => {
state.write_u8(Type::WebRTC as u8); state.write_u8(TYPE_WEBRTC);
offer.hash(state); offer.hash(state);
} }
} }

View file

@ -210,7 +210,7 @@ impl InetAddress {
match self.sa.sa_family as u8 { match self.sa.sa_family as u8 {
AF_INET => &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(), AF_INET => &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(),
AF_INET6 => &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>(), AF_INET6 => &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>(),
_ => &[] _ => &[],
} }
} }
} }

View file

@ -125,6 +125,7 @@ pub struct Node {
locator: Mutex<Option<Locator>>, locator: Mutex<Option<Locator>>,
paths: DashMap<Endpoint, Arc<Path>>, paths: DashMap<Endpoint, Arc<Path>>,
peers: DashMap<Address, Arc<Peer>>, peers: DashMap<Address, Arc<Peer>>,
root: Mutex<Option<Arc<Peer>>>,
whois: WhoisQueue, whois: WhoisQueue,
buffer_pool: Pool<Buffer<{ PACKET_SIZE_MAX }>, PooledBufferFactory<{ PACKET_SIZE_MAX }>>, buffer_pool: Pool<Buffer<{ PACKET_SIZE_MAX }>, PooledBufferFactory<{ PACKET_SIZE_MAX }>>,
secure_prng: SecureRandom, secure_prng: SecureRandom,
@ -162,6 +163,7 @@ impl Node {
locator: Mutex::new(None), locator: Mutex::new(None),
paths: DashMap::new(), paths: DashMap::new(),
peers: DashMap::new(), peers: DashMap::new(),
root: Mutex::new(None),
whois: WhoisQueue::new(), whois: WhoisQueue::new(),
buffer_pool: Pool::new(64, PooledBufferFactory), buffer_pool: Pool::new(64, PooledBufferFactory),
secure_prng: SecureRandom::get(), secure_prng: SecureRandom::get(),
@ -245,6 +247,12 @@ impl Node {
*/ */
} }
/// Get the current best root peer that we should use for WHOIS, relaying, etc.
#[inline(always)]
pub(crate) fn root(&self) -> Option<Arc<Peer>> {
self.root.lock().clone()
}
/// Get the canonical Path object for a given endpoint and local socket information. /// Get the canonical Path object for a given endpoint and local socket information.
/// This is a canonicalizing function that returns a unique path object for every tuple /// This is a canonicalizing function that returns a unique path object for every tuple
/// of endpoint, local socket, and local interface. /// of endpoint, local socket, and local interface.

View file

@ -91,6 +91,7 @@ impl Path {
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
} }
/// Called every INTERVAL during background tasks.
#[inline(always)] #[inline(always)]
pub fn on_interval<CI: VL1CallerInterface>(&self, ct: &CI, time_ticks: i64) { pub fn on_interval<CI: VL1CallerInterface>(&self, ct: &CI, time_ticks: i64) {
self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < FRAGMENT_EXPIRATION); self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < FRAGMENT_EXPIRATION);

View file

@ -12,7 +12,7 @@ use crate::crypto::random::next_u64_secure;
use crate::crypto::salsa::Salsa; use crate::crypto::salsa::Salsa;
use crate::crypto::secret::Secret; use crate::crypto::secret::Secret;
use crate::util::pool::{Pool, PoolFactory}; use crate::util::pool::{Pool, PoolFactory};
use crate::vl1::{Identity, Path}; use crate::vl1::{Identity, Path, Endpoint};
use crate::vl1::buffer::Buffer; use crate::vl1::buffer::Buffer;
use crate::vl1::constants::*; use crate::vl1::constants::*;
use crate::vl1::node::*; use crate::vl1::node::*;
@ -167,9 +167,8 @@ impl Peer {
pub(crate) fn receive<CI: VL1CallerInterface, PH: VL1PacketHandler>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) { pub(crate) fn receive<CI: VL1CallerInterface, PH: VL1PacketHandler>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) {
let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| {
let mut payload: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new(); let mut payload: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new();
let mut forward_secrecy = true;
let cipher = header.cipher(); let cipher = header.cipher();
let mut forward_secrecy = true;
let ephemeral_secret = self.ephemeral_secret.lock().clone(); let ephemeral_secret = self.ephemeral_secret.lock().clone();
for secret in [ephemeral_secret.as_ref().map_or(&self.static_secret, |s| s.as_ref()), &self.static_secret] { for secret in [ephemeral_secret.as_ref().map_or(&self.static_secret, |s| s.as_ref()), &self.static_secret] {
match cipher { match cipher {
@ -234,7 +233,10 @@ impl Peer {
} }
} }
_ => {} _ => {
// Unrecognized or unsupported cipher type.
return;
}
} }
if (secret as *const PeerSecret) == (&self.static_secret as *const PeerSecret) { if (secret as *const PeerSecret) == (&self.static_secret as *const PeerSecret) {
@ -247,7 +249,6 @@ impl Peer {
payload.clear(); payload.clear();
} }
} }
drop(ephemeral_secret);
// If decryption and authentication succeeded, the code above will break out of the // If decryption and authentication succeeded, the code above will break out of the
// for loop and end up here. Otherwise it returns from the whole function. // for loop and end up here. Otherwise it returns from the whole function.
@ -277,6 +278,9 @@ impl Peer {
}); });
} }
pub(crate) fn send_hello<CI: VL1CallerInterface>(&self, ci: &CI, to_endpoint: &Endpoint) {
}
/// Get the remote version of this peer: major, minor, revision, and build. /// Get the remote version of this peer: major, minor, revision, and build.
/// Returns None if it's not yet known. /// Returns None if it's not yet known.
pub fn version(&self) -> Option<[u16; 4]> { pub fn version(&self) -> Option<[u16; 4]> {

View file

@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::{HashMap, LinkedList};
use parking_lot::Mutex; use parking_lot::Mutex;
@ -14,7 +14,7 @@ pub(crate) enum QueuedPacket {
} }
struct WhoisQueueItem { struct WhoisQueueItem {
packet_queue: Vec<QueuedPacket>, packet_queue: LinkedList<QueuedPacket>,
retry_gate: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, retry_gate: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
retry_count: u16, retry_count: u16,
} }
@ -24,7 +24,7 @@ pub(crate) struct WhoisQueue {
} }
impl WhoisQueue { impl WhoisQueue {
pub const INTERVAL: i64 = WHOIS_RETRY_INTERVAL; pub(crate) const INTERVAL: i64 = WHOIS_RETRY_INTERVAL;
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -36,7 +36,7 @@ impl WhoisQueue {
let mut q = self.queue.lock(); let mut q = self.queue.lock();
let qi = q.entry(target).or_insert_with(|| WhoisQueueItem { let qi = q.entry(target).or_insert_with(|| WhoisQueueItem {
packet_queue: Vec::new(), packet_queue: LinkedList::new(),
retry_gate: IntervalGate::new(0), retry_gate: IntervalGate::new(0),
retry_count: 0, retry_count: 0,
}); });
@ -44,12 +44,22 @@ impl WhoisQueue {
if qi.retry_gate.gate(ci.time_ticks()) { if qi.retry_gate.gate(ci.time_ticks()) {
qi.retry_count += 1; qi.retry_count += 1;
if packet.is_some() { if packet.is_some() {
qi.packet_queue.push(packet.unwrap()); while qi.packet_queue.len() >= WHOIS_MAX_WAITING_PACKETS {
let _ = qi.packet_queue.pop_front();
}
qi.packet_queue.push_back(packet.unwrap());
} }
self.send_whois(node, ci, &[target]); self.send_whois(node, ci, &[target]);
} }
} }
/// Remove a WHOIS request from the queue and call the supplied function for all queued packets.
pub fn response_received_get_packets<F: FnMut(&mut QueuedPacket)>(&self, address: Address, packet_handler: F) {
let mut qi = self.queue.lock().remove(&address);
let _ = qi.map(|mut qi| qi.packet_queue.iter_mut().for_each(packet_handler));
}
/// Called every INTERVAL during background tasks.
pub fn on_interval<CI: VL1CallerInterface>(&self, node: &Node, ci: &CI, time_ticks: i64) { pub fn on_interval<CI: VL1CallerInterface>(&self, node: &Node, ci: &CI, time_ticks: i64) {
let mut targets: Vec<Address> = Vec::new(); let mut targets: Vec<Address> = Vec::new();
self.queue.lock().retain(|target, qi| { self.queue.lock().retain(|target, qi| {