Another absolute ton of work including getting the service almost to the point that it is ready to say HELLO. Also added an event and tracing infrastructure in the core. Some of the common Rust tracing/logging libraries are not used directly in the core for speed and future portability reasons but they will likely be used in the service.

This commit is contained in:
Adam Ierymenko 2022-05-25 18:28:07 -04:00
parent 1ad5d623f5
commit ded7c25786
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
27 changed files with 1040 additions and 265 deletions

261
attic/flatkv.rs Normal file
View file

@ -0,0 +1,261 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::borrow::Cow;
/// A flat key/value store implemented in terms of arrays tuples of (key, value).
#[repr(transparent)]
#[derive(Clone, PartialEq, Eq)]
pub struct FlatKV(Vec<(&'static str, Value)>);
/// Value variant for FlatKV.
#[derive(Clone, PartialEq, Eq)]
pub enum Value {
N,
KV(FlatKV),
S(Cow<'static, str>),
I(i64),
UI(u64),
B(bool),
Endpoint(crate::vl1::Endpoint),
Identity(crate::vl1::Identity),
}
impl Into<Value> for FlatKV {
#[inline(always)]
fn into(self) -> Value {
Value::KV(self)
}
}
impl Into<Value> for &'static str {
#[inline(always)]
fn into(self) -> Value {
Value::S(Cow::Borrowed(self))
}
}
impl Into<Value> for String {
#[inline(always)]
fn into(self) -> Value {
Value::S(Cow::Owned(self))
}
}
impl Into<Value> for i64 {
#[inline(always)]
fn into(self) -> Value {
Value::I(self)
}
}
impl Into<Value> for u64 {
#[inline(always)]
fn into(self) -> Value {
Value::UI(self)
}
}
impl Into<Value> for isize {
#[inline(always)]
fn into(self) -> Value {
Value::I(self as i64)
}
}
impl Into<Value> for usize {
#[inline(always)]
fn into(self) -> Value {
Value::UI(self as u64)
}
}
impl Into<Value> for i32 {
#[inline(always)]
fn into(self) -> Value {
Value::I(self as i64)
}
}
impl Into<Value> for u32 {
#[inline(always)]
fn into(self) -> Value {
Value::UI(self as u64)
}
}
impl Into<Value> for i16 {
#[inline(always)]
fn into(self) -> Value {
Value::I(self as i64)
}
}
impl Into<Value> for u16 {
#[inline(always)]
fn into(self) -> Value {
Value::UI(self as u64)
}
}
impl Into<Value> for i8 {
#[inline(always)]
fn into(self) -> Value {
Value::I(self as i64)
}
}
impl Into<Value> for u8 {
#[inline(always)]
fn into(self) -> Value {
Value::UI(self as u64)
}
}
impl Into<Value> for bool {
#[inline(always)]
fn into(self) -> Value {
Value::B(self)
}
}
impl Into<Value> for crate::vl1::Endpoint {
#[inline(always)]
fn into(self) -> Value {
Value::Endpoint(self)
}
}
impl Into<Value> for crate::vl1::InetAddress {
#[inline(always)]
fn into(self) -> Value {
Value::Endpoint(crate::vl1::Endpoint::IpUdp(self))
}
}
impl Into<Value> for crate::vl1::Identity {
#[inline(always)]
fn into(self) -> Value {
Value::Identity(self)
}
}
impl ToString for Value {
fn to_string(&self) -> String {
match self {
Value::N => "(null)".into(),
Value::KV(x) => x.to_string(),
Value::S(x) => x.to_string(),
Value::I(x) => x.to_string(),
Value::UI(x) => x.to_string(),
Value::B(x) => x.to_string(),
Value::Endpoint(x) => x.to_string(),
Value::Identity(x) => x.to_string(),
}
}
}
impl FlatKV {
#[inline(always)]
pub fn add<T: Into<Value>>(&mut self, k: &'static str, v: T) {
self.0.push((k, v.into()))
}
}
fn json_escape(src: &str, escaped: &mut String) {
use std::fmt::Write;
let mut utf16_buf = [0u16; 2];
for c in src.chars() {
match c {
'\x08' => escaped.push_str("\\b"),
'\x0c' => escaped.push_str("\\f"),
'\n' => escaped.push_str("\\n"),
'\r' => escaped.push_str("\\r"),
'\t' => escaped.push_str("\\t"),
'"' => escaped.push_str("\\\""),
'\\' => escaped.push_str("\\\\"),
'/' => escaped.push_str("\\/"),
c if c.is_ascii_graphic() => escaped.push(c),
c => {
let encoded = c.encode_utf16(&mut utf16_buf);
for utf16 in encoded {
write!(escaped, "\\u{:04X}", utf16).unwrap();
}
}
}
}
}
impl Default for FlatKV {
#[inline(always)]
fn default() -> Self {
Self(Vec::new())
}
}
impl FlatKV {
#[inline(always)]
pub fn new() -> Self {
Self(Vec::new())
}
}
impl ToString for FlatKV {
/// Output a JSON formatted map of values or maps.
fn to_string(&self) -> String {
let mut first = true;
let mut tmp = String::new();
tmp.push_str("{ "); //} //"
for (k, v) in self.0.iter() {
if first {
first = false;
} else {
tmp.push_str(", ");
}
tmp.push('"');
json_escape(*k, &mut tmp);
tmp.push_str("\": ");
match v {
Value::S(_) | Value::Endpoint(_) | Value::Identity(_) => {
tmp.push('"');
json_escape(v.to_string().as_str(), &mut tmp);
tmp.push('"');
}
_ => tmp.push_str(v.to_string().as_str()),
}
}
tmp.push_str("} ");
tmp
}
}
#[macro_export]
macro_rules! kv {
($($key:expr => $value:expr,)+) => (kv!($($key => $value),+));
( $($key:expr => $value:expr),* ) => {
{
#[allow(unused_mut)]
let mut _kv = crate::util::flatkv::FlatKV(Vec::new());
$(
_kv.add($key, $value);
)*
_kv
}
};
}
#[cfg(test)]
mod tests {
#[test]
fn kv_macro() {
let kv = kv!(
"foo" => 0_u64,
"bar" => "bar",
"baz" => -1_i64,
"lala" => false,
"lol" => kv!(
"boo" => 1_u16,
"far" => 2_u32,
)
);
}
}

View file

@ -1,11 +1,12 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use openssl::rand::rand_bytes;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicU64, Ordering};
pub struct SecureRandom;
use openssl::rand::rand_bytes;
use lazy_static::lazy_static;
#[inline(always)]
pub fn next_u32_secure() -> u32 {
unsafe {
let mut tmp: [u32; 1] = MaybeUninit::uninit().assume_init();
@ -14,7 +15,6 @@ pub fn next_u32_secure() -> u32 {
}
}
#[inline(always)]
pub fn next_u64_secure() -> u64 {
unsafe {
let mut tmp: [u64; 1] = MaybeUninit::uninit().assume_init();
@ -23,6 +23,14 @@ pub fn next_u64_secure() -> u64 {
}
}
pub fn next_u128_secure() -> u128 {
unsafe {
let mut tmp: [u128; 1] = MaybeUninit::uninit().assume_init();
assert!(rand_bytes(&mut *(tmp.as_mut_ptr().cast::<[u8; 16]>())).is_ok());
tmp[0]
}
}
#[inline(always)]
pub fn fill_bytes_secure(dest: &mut [u8]) {
assert!(rand_bytes(dest).is_ok());
@ -35,6 +43,15 @@ pub fn get_bytes_secure<const COUNT: usize>() -> [u8; COUNT] {
tmp
}
pub struct SecureRandom;
impl Default for SecureRandom {
#[inline(always)]
fn default() -> Self {
Self
}
}
impl SecureRandom {
#[inline(always)]
pub fn get() -> Self {
@ -66,19 +83,21 @@ impl rand_core::RngCore for SecureRandom {
impl rand_core::CryptoRng for SecureRandom {}
unsafe impl Sync for SecureRandom {}
unsafe impl Send for SecureRandom {}
static mut XORSHIFT64_STATE: u64 = 0;
lazy_static! {
static ref XORSHIFT64_STATE: AtomicU64 = AtomicU64::new(next_u64_secure());
}
/// Get a non-cryptographic random number.
pub fn xorshift64_random() -> u64 {
let mut x = unsafe { XORSHIFT64_STATE };
while x == 0 {
x = next_u64_secure();
}
let mut x = XORSHIFT64_STATE.load(Ordering::Relaxed);
x = x.wrapping_add((x == 0) as u64);
x ^= x.wrapping_shl(13);
x ^= x.wrapping_shr(7);
x ^= x.wrapping_shl(17);
unsafe { XORSHIFT64_STATE = x };
XORSHIFT64_STATE.store(x, Ordering::Relaxed);
x
}

View file

@ -11,6 +11,10 @@ opt-level = 3
codegen-units = 1
panic = 'abort'
[features]
default = ["zt_trace"]
zt_trace = []
[dependencies]
zerotier-core-crypto = { path = "../zerotier-core-crypto" }
base64 = "^0"

View file

@ -0,0 +1,49 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use crate::vl1::*;
#[derive(Clone)]
pub enum Event {
// Change in node online status.
Online(bool),
// Tracing: source file, line number, message (if enabled in build).
Trace(&'static str, u32, String),
// An anomalous event has been encountered that could indicate a possible security problem.
SecurityWarning(String),
// A fatal error has occurred.
FatalError(String),
// This node has automatically generated an identity.
IdentityAutoGenerated(Identity),
// This node's identity has automatically been upgraded, contains old and new.
IdentityAutoUpgraded(Identity, Identity),
// The list of roots has been updated, contains old and new.
UpdatedRoots(Vec<Identity>, Vec<Identity>),
}
impl ToString for Event {
fn to_string(&self) -> String {
match self {
Event::Online(online) => format!("[vl1] online == {}", online),
Event::Trace(l, f, m) => format!("[trace] {}:{} {}", l, f, m),
Event::SecurityWarning(w) => format!("[global] security warning: {}", w),
Event::FatalError(e) => format!("[global] FATAL: {}", e),
Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()),
Event::IdentityAutoUpgraded(oldid, newid) => format!("[vl1] identity upgrade: {} => {}", oldid.to_string(), newid.to_string()),
Event::UpdatedRoots(_, newroots) => {
let mut tmp = String::with_capacity(128);
tmp.push_str("[vl1] updated root set:");
for r in newroots.iter() {
tmp.push(' ');
tmp.push_str(r.address.to_string().as_str());
}
tmp
}
}
}
}

View file

@ -9,7 +9,9 @@ pub mod util;
pub mod vl1;
pub mod vl2;
mod event;
mod networkhypervisor;
pub use event::Event;
pub use networkhypervisor::{Interface, NetworkHypervisor};
pub use vl1::protocol::{PacketBuffer, PooledPacketBuffer};

View file

@ -15,9 +15,9 @@ pub struct NetworkHypervisor<I: Interface> {
}
impl<I: Interface> NetworkHypervisor<I> {
pub fn new(ii: &I, auto_generate_identity: bool) -> Result<Self, InvalidParameterError> {
pub fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
Ok(NetworkHypervisor {
vl1: Node::new(ii, auto_generate_identity)?,
vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity)?,
vl2: Switch::new(),
})
}
@ -43,8 +43,8 @@ impl<I: Interface> NetworkHypervisor<I> {
}
#[inline(always)]
pub fn wire_receive(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) {
self.vl1.wire_receive(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data)
pub fn handle_incoming_physical_packet(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) {
self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data)
}
#[inline(always)]

View file

@ -4,7 +4,7 @@ use crate::util::buffer::Buffer;
/// Must be larger than any object we want to use with to_bytes() or from_bytes().
/// This hack can go away once Rust allows us to reference trait consts as generics.
const TEMP_BUF_SIZE: usize = 16384;
const TEMP_BUF_SIZE: usize = 8192;
/// A super-lightweight zero-allocation serialization interface.
pub trait Marshalable: Sized {
@ -25,7 +25,6 @@ pub trait Marshalable: Sized {
/// This will return an Err if the buffer is too small or some other error occurs. It's just
/// a shortcut to creating a buffer and marshaling into it.
fn to_buffer<const BL: usize>(&self) -> std::io::Result<Buffer<BL>> {
assert!(BL >= Self::MAX_MARSHAL_SIZE);
let mut tmp = Buffer::new();
self.marshal(&mut tmp)?;
Ok(tmp)
@ -41,9 +40,8 @@ pub trait Marshalable: Sized {
/// Marshal and convert to a Rust vector.
fn to_bytes(&self) -> Vec<u8> {
assert!(Self::MAX_MARSHAL_SIZE <= TEMP_BUF_SIZE);
let mut tmp = Buffer::<TEMP_BUF_SIZE>::new();
assert!(self.marshal(&mut tmp).is_ok());
assert!(self.marshal(&mut tmp).is_ok()); // panics if TEMP_BUF_SIZE is too small
tmp.as_bytes().to_vec()
}

View file

@ -10,71 +10,23 @@ pub use zerotier_core_crypto::varint;
pub(crate) const ZEROES: [u8; 64] = [0_u8; 64];
#[cfg(target_feature = "zt_trace")]
macro_rules! zt_trace {
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {
$si.event(crate::Event::Trace(file!(), line!(), format!($fmt, $($($arg)*)?)));
}
}
#[cfg(not(target_feature = "zt_trace"))]
macro_rules! zt_trace {
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {};
}
pub(crate) use zt_trace;
/// Obtain a reference to a sub-array within an existing byte array.
#[inline(always)]
pub(crate) fn byte_array_range<const A: usize, const START: usize, const LEN: usize>(a: &[u8; A]) -> &[u8; LEN] {
assert!((START + LEN) <= A);
unsafe { &*a.as_ptr().add(START).cast::<[u8; LEN]>() }
}
/// A super-minimal hasher for u64 keys for keys already fairly randomly distributed like addresses and network IDs.
#[derive(Copy, Clone)]
pub(crate) struct U64NoOpHasher(u64);
impl U64NoOpHasher {
#[inline(always)]
pub fn new() -> Self {
Self(0)
}
}
impl std::hash::Hasher for U64NoOpHasher {
#[inline(always)]
fn finish(&self) -> u64 {
self.0.wrapping_add(self.0.wrapping_shr(32))
}
#[inline(always)]
fn write_u64(&mut self, i: u64) {
self.0 = self.0.wrapping_add(i);
}
#[inline(always)]
fn write_i64(&mut self, i: i64) {
self.0 = self.0.wrapping_add(i as u64);
}
#[inline(always)]
fn write_usize(&mut self, i: usize) {
self.0 = self.0.wrapping_add(i as u64);
}
#[inline(always)]
fn write_isize(&mut self, i: isize) {
self.0 = self.0.wrapping_add(i as u64);
}
#[inline(always)]
fn write_u32(&mut self, i: u32) {
self.0 = self.0.wrapping_add(i as u64);
}
#[inline(always)]
fn write_i32(&mut self, i: i32) {
self.0 = self.0.wrapping_add(i as u64);
}
#[inline(always)]
fn write(&mut self, _: &[u8]) {
panic!("U64NoOpHasher should only be used with u64 and i64 types");
}
}
impl std::hash::BuildHasher for U64NoOpHasher {
type Hasher = Self;
#[inline(always)]
fn build_hasher(&self) -> Self::Hasher {
Self(0)
}
}

View file

@ -63,6 +63,10 @@ impl<O, F: PoolFactory<O>> Pooled<O, F> {
}
}
unsafe impl<O, F: PoolFactory<O>> Send for Pooled<O, F> where O: Send {}
unsafe impl<O, F: PoolFactory<O>> Sync for Pooled<O, F> where O: Sync {}
impl<O, F: PoolFactory<O>> std::ops::Deref for Pooled<O, F> {
type Target = O;

View file

@ -7,9 +7,6 @@ use crate::vl1::protocol::*;
/// Performance note: PacketBuffer is Pooled<Buffer> which is NotNull<*mut Buffer>.
/// That means Option<PacketBuffer> is just a pointer, since NotNull permits the
/// compiler to optimize out any additional state in Option.
///
/// This will need to be modified if we ever support more than 8 fragments to increase
/// the size of frags[] and the number of bits in 'have' and 'expecting'.
pub(crate) struct FragmentedPacket {
pub ts_ticks: i64,
pub frags: [Option<PooledPacketBuffer>; packet_constants::FRAGMENT_COUNT_MAX],
@ -18,20 +15,22 @@ pub(crate) struct FragmentedPacket {
}
impl FragmentedPacket {
#[inline]
pub fn new(ts: i64) -> Self {
// 'have' and 'expecting' must be expanded if this is >8
debug_assert!(packet_constants::FRAGMENT_COUNT_MAX <= 8);
Self {
ts_ticks: ts,
frags: [None, None, None, None, None, None, None, None],
frags: Default::default(),
have: 0,
expecting: 0,
}
}
/// Add a fragment to this fragment set and return true if all fragments are present.
#[inline]
#[inline(always)]
pub fn add_fragment(&mut self, frag: PooledPacketBuffer, no: u8, expecting: u8) -> bool {
self.frags.get_mut(no as usize).map_or(false, |entry| {
if let Some(entry) = self.frags.get_mut(no as usize) {
/*
* This works by setting bit N in the 'have' bit mask and then setting X bits
* in 'expecting' if the 'expecting' field is non-zero. Since the packet head
@ -59,6 +58,8 @@ impl FragmentedPacket {
self.have |= 1_u8.wrapping_shl(no as u32);
self.expecting |= 0xff_u8.wrapping_shr(8 - (expecting as u32));
self.have == self.expecting
})
} else {
false
}
}
}

View file

@ -277,16 +277,16 @@ impl Identity {
/// Sign a message with this identity.
///
/// If legacy_compatibility is true this generates only an ed25519 signature and uses the old
/// If legacy_ed25519_only is true this generates only an ed25519 signature and uses the old
/// format that also includes part of the plaintext hash at the end. The include_algorithms mask
/// will be ignored. Otherwise it will generate a signature for every algorithm with a secret
/// in this identity and that is specified in the include_algorithms bit mask.
///
/// A return of None happens if we don't have our secret key(s) or some other error occurs.
pub fn sign(&self, msg: &[u8], include_algorithms: u8, legacy_compatibility: bool) -> Option<Vec<u8>> {
pub fn sign(&self, msg: &[u8], include_algorithms: u8, legacy_ed25519_only: bool) -> Option<Vec<u8>> {
if self.secret.is_some() {
let secret = self.secret.as_ref().unwrap();
if legacy_compatibility {
if legacy_ed25519_only {
Some(secret.ed25519.sign_zt(msg).to_vec())
} else {
let mut tmp: Vec<u8> = Vec::with_capacity(1 + P384_ECDSA_SIGNATURE_SIZE + ED25519_SIGNATURE_SIZE);

View file

@ -3,7 +3,7 @@
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::mem::{size_of, transmute_copy, zeroed, MaybeUninit};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs};
use std::ptr::{copy_nonoverlapping, null, slice_from_raw_parts, write_bytes};
use std::str::FromStr;
@ -74,9 +74,27 @@ pub union InetAddress {
ss: sockaddr_storage, // some external code may expect the struct to be this full length
}
impl ToSocketAddrs for InetAddress {
type Iter = std::iter::Once<SocketAddr>;
#[inline(always)]
fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
self.try_into().map_or_else(|_| Err(std::io::Error::new(std::io::ErrorKind::Other, "not an IP address")), |sa| Ok(std::iter::once(sa)))
}
}
impl TryInto<IpAddr> for InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<IpAddr, Self::Error> {
(&self).try_into()
}
}
impl TryInto<IpAddr> for &InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<IpAddr, Self::Error> {
match unsafe { self.sa.sa_family } {
@ -90,6 +108,15 @@ impl TryInto<IpAddr> for InetAddress {
impl TryInto<Ipv4Addr> for InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<Ipv4Addr, Self::Error> {
(&self).try_into()
}
}
impl TryInto<Ipv4Addr> for &InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<Ipv4Addr, Self::Error> {
match unsafe { self.sa.sa_family } {
@ -102,6 +129,15 @@ impl TryInto<Ipv4Addr> for InetAddress {
impl TryInto<Ipv6Addr> for InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<Ipv6Addr, Self::Error> {
(&self).try_into()
}
}
impl TryInto<Ipv6Addr> for &InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<Ipv6Addr, Self::Error> {
match unsafe { self.sa.sa_family } {
@ -114,6 +150,15 @@ impl TryInto<Ipv6Addr> for InetAddress {
impl TryInto<SocketAddr> for InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<SocketAddr, Self::Error> {
(&self).try_into()
}
}
impl TryInto<SocketAddr> for &InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<SocketAddr, Self::Error> {
unsafe {
@ -129,6 +174,15 @@ impl TryInto<SocketAddr> for InetAddress {
impl TryInto<SocketAddrV4> for InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<SocketAddrV4, Self::Error> {
(&self).try_into()
}
}
impl TryInto<SocketAddrV4> for &InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<SocketAddrV4, Self::Error> {
unsafe {
@ -143,6 +197,15 @@ impl TryInto<SocketAddrV4> for InetAddress {
impl TryInto<SocketAddrV6> for InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<SocketAddrV6, Self::Error> {
(&self).try_into()
}
}
impl TryInto<SocketAddrV6> for &InetAddress {
type Error = crate::error::InvalidParameterError;
#[inline(always)]
fn try_into(self) -> Result<SocketAddrV6, Self::Error> {
unsafe {
@ -328,6 +391,9 @@ impl<'de> Deserialize<'de> for InetAddress {
}
impl InetAddress {
pub const AF_INET: u8 = AF_INET;
pub const AF_INET6: u8 = AF_INET6;
/// Get a new zero/nil InetAddress.
#[inline(always)]
pub fn new() -> InetAddress {

View file

@ -2,7 +2,6 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::num::NonZeroI64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
@ -12,11 +11,13 @@ use parking_lot::{Mutex, RwLock};
use crate::error::InvalidParameterError;
use crate::util::gate::IntervalGate;
use crate::util::zt_trace;
use crate::vl1::path::Path;
use crate::vl1::peer::Peer;
use crate::vl1::protocol::*;
use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue};
use crate::vl1::{Address, Endpoint, Identity, RootSet};
use crate::Event;
/// Trait implemented by external code to handle events and provide an interface to the system or application.
///
@ -24,25 +25,16 @@ use crate::vl1::{Address, Endpoint, Identity, RootSet};
/// during calls to things like wire_recieve() and do_background_tasks().
pub trait SystemInterface: Sync + Send + 'static {
/// Type for local system sockets.
type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone;
type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
/// Type for local system interfaces.
type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone;
type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
/// Node is up and ready for operation.
fn event_node_is_up(&self);
/// Node is shutting down.
fn event_node_is_down(&self);
/// Node has gone online or offline.
fn event_online_status_change(&self, online: bool);
/// An event occurred.
fn event(&self, event: Event);
/// A USER_MESSAGE packet was received.
fn event_user_message(&self, source: &Identity, message_type: u64, message: &[u8]);
/// VL1 core generated a security warning.
fn event_security_warning(&self, warning: &str);
fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]);
/// Check a local socket for validity.
///
@ -75,7 +67,7 @@ pub trait SystemInterface: Sync + Send + 'static {
fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool;
/// Called to look up any statically defined or memorized paths to known nodes.
fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<NonZeroI64>, Option<NonZeroI64>)>>;
fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>>;
/// Called to get the current time in milliseconds from the system monotonically increasing clock.
/// This needs to be accurate to about 250 milliseconds resolution or better.
@ -119,8 +111,8 @@ struct BackgroundTaskIntervals {
whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>,
paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>,
peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>,
root_sync: IntervalGate<ROOT_SYNC_INTERVAL_MS>,
root_hello: IntervalGate<ROOT_HELLO_INTERVAL>,
root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>,
root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>,
}
struct RootInfo<SI: SystemInterface> {
@ -132,7 +124,7 @@ struct RootInfo<SI: SystemInterface> {
/// A VL1 global P2P network node.
pub struct Node<SI: SystemInterface> {
/// A random ID generated to identify this particular running instance.
pub instance_id: u64,
pub instance_id: [u8; 16],
/// This node's identity and permanent keys.
pub identity: Identity,
@ -161,7 +153,7 @@ pub struct Node<SI: SystemInterface> {
impl<SI: SystemInterface> Node<SI> {
/// Create a new Node.
pub fn new(si: &SI, auto_generate_identity: bool) -> Result<Self, InvalidParameterError> {
pub fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result<Self, InvalidParameterError> {
let mut id = {
let id = si.load_node_identity();
if id.is_none() {
@ -169,6 +161,7 @@ impl<SI: SystemInterface> Node<SI> {
return Err(InvalidParameterError("no identity found and auto-generate not enabled"));
} else {
let id = Identity::generate();
si.event(Event::IdentityAutoGenerated(id.clone()));
si.save_node_identity(&id);
id
}
@ -177,13 +170,16 @@ impl<SI: SystemInterface> Node<SI> {
}
};
// Automatically upgrade old type identities to add P-384 keys.
if id.upgrade()? {
si.save_node_identity(&id);
if auto_upgrade_identity {
let old = id.clone();
if id.upgrade()? {
si.save_node_identity(&id);
si.event(Event::IdentityAutoUpgraded(old, id.clone()));
}
}
Ok(Self {
instance_id: zerotier_core_crypto::random::next_u64_secure(),
instance_id: zerotier_core_crypto::random::get_bytes_secure(),
identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: DashMap::new(),
@ -206,15 +202,14 @@ impl<SI: SystemInterface> Node<SI> {
}
/// Get a peer by address.
#[inline(always)]
pub fn peer(&self, a: Address) -> Option<Arc<Peer<SI>>> {
self.peers.get(&a).map(|peer| peer.value().clone())
}
/// Run background tasks and return desired delay until next call in milliseconds.
///
/// This should only be called periodically from a single thread, but that thread can be
/// different each time. Calling it concurrently won't crash but won't accomplish anything.
/// This shouldn't be called concurrently by more than one loop. Doing so would be harmless
/// but would be a waste of compute cycles.
pub fn do_background_tasks(&self, si: &SI) -> Duration {
let mut intervals = self.intervals.lock();
let tt = si.time_ticks();
@ -225,7 +220,11 @@ impl<SI: SystemInterface> Node<SI> {
// Sychronize root info with root sets info if the latter has changed.
if *sets_modified {
*sets_modified = false;
roots.clear();
zt_trace!(si, "root sets modified, rescanning...");
let mut old_root_identities: Vec<Identity> = roots.drain().map(|r| r.0.identity.clone()).collect();
let mut new_root_identities = Vec::new();
let mut colliding_root_addresses = Vec::new(); // see security note below
for (_, rc) in sets.iter() {
for m in rc.members.iter() {
@ -252,16 +251,28 @@ impl<SI: SystemInterface> Node<SI> {
Ok(root_peer_entry)
} else {
colliding_root_addresses.push(m.identity.address);
si.event_security_warning(
format!("address/identity collision between root {} (from root cluster definition '{}') and known peer {}", m.identity.address.to_string(), rc.name, rp.identity.to_string()).as_str(),
);
si.event(Event::SecurityWarning(format!(
"address/identity collision between root {} (from root cluster definition '{}') and known peer {}, ignoring this root!",
m.identity.address.to_string(),
rc.name,
rp.identity.to_string()
)));
Err(crate::error::UnexpectedError)
}
})
.map(|r| roots.insert(r.value().clone(), m.endpoints.as_ref().unwrap().iter().map(|e| e.clone()).collect()));
.map(|r| {
new_root_identities.push(r.value().identity.clone());
roots.insert(r.value().clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
});
}
}
}
old_root_identities.sort_unstable();
new_root_identities.sort_unstable();
if !old_root_identities.eq(&new_root_identities) {
si.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
}
}
// Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint
@ -272,6 +283,7 @@ impl<SI: SystemInterface> Node<SI> {
if intervals.root_hello.gate(tt) {
for (root, endpoints) in roots.iter() {
for ep in endpoints.iter() {
zt_trace!(si, "sending HELLO to root {}", root.identity.address.to_string());
root.send_hello(si, self, Some(ep));
}
}
@ -300,7 +312,8 @@ impl<SI: SystemInterface> Node<SI> {
}
if intervals.paths.gate(tt) {
// Service all paths, removing expired or invalid ones.
// Service all paths, removing expired or invalid ones. This is done in two passes to
// avoid introducing latency into a flow.
self.paths.retain(|_, pbs| {
let mut expired_paths = Vec::new();
for (ls, path) in pbs.read().iter() {
@ -319,14 +332,16 @@ impl<SI: SystemInterface> Node<SI> {
}
if intervals.whois.gate(tt) {
let _ = self.whois.service(si, self, tt);
self.whois.service(si, self, tt);
}
Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2)
}
/// Called when a packet is received on the physical wire.
pub fn wire_receive<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) {
pub fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) {
zt_trace!(si, "<< incoming packet from {} length {} via socket {}@{}", source_endpoint.to_string(), data.len(), source_local_socket.to_string(), source_local_interface.to_string());
if let Ok(fragment_header) = data.struct_mut_at::<FragmentHeader>(0) {
if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) {
let time_ticks = si.time_ticks();
@ -339,7 +354,7 @@ impl<SI: SystemInterface> Node<SI> {
if fragment_header.is_fragment() {
if let Some(assembled_packet) = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) {
if let Some(frag0) = assembled_packet.frags[0].as_ref() {
// Fragmented packet is fully assembled.
zt_trace!(si, "fragmented packet fully assembled!");
let packet_header = frag0.struct_at::<PacketHeader>(0);
if packet_header.is_ok() {
@ -356,7 +371,7 @@ impl<SI: SystemInterface> Node<SI> {
}
} else {
if let Ok(packet_header) = data.struct_at::<PacketHeader>(0) {
// Packet is not fragmented.
zt_trace!(si, "parsing unfragmented packet");
if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) {
@ -389,12 +404,11 @@ impl<SI: SystemInterface> Node<SI> {
peer.forward(si, time_ticks, data.as_ref());
}
}
};
}
}
}
/// Get the current best root peer that we should use for WHOIS, relaying, etc.
#[inline(always)]
pub fn root(&self) -> Option<Arc<Peer<SI>>> {
self.best_root.read().clone()
}

View file

@ -1,6 +1,7 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::HashMap;
use std::hash::{BuildHasher, Hasher};
use std::sync::atomic::{AtomicI64, Ordering};
use parking_lot::Mutex;
@ -21,10 +22,10 @@ pub struct Path<SI: SystemInterface> {
pub endpoint: Endpoint,
pub local_socket: SI::LocalSocket,
pub local_interface: SI::LocalInterface,
pub(crate) last_send_time_ticks: AtomicI64,
pub(crate) last_receive_time_ticks: AtomicI64,
pub(crate) create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<u64, FragmentedPacket, U64NoOpHasher>>,
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<u64, FragmentedPacket, PacketIdHasher>>,
}
impl<SI: SystemInterface> Path<SI> {
@ -36,7 +37,7 @@ impl<SI: SystemInterface> Path<SI> {
last_send_time_ticks: AtomicI64::new(0),
last_receive_time_ticks: AtomicI64::new(0),
create_time_ticks: time_ticks,
fragmented_packets: Mutex::new(HashMap::with_capacity_and_hasher(4, U64NoOpHasher::new())),
fragmented_packets: Mutex::new(HashMap::with_capacity_and_hasher(4, PacketIdHasher(zerotier_core_crypto::random::xorshift64_random()))),
}
}
@ -92,3 +93,36 @@ impl<SI: SystemInterface> Path<SI> {
}
}
}
#[repr(transparent)]
struct PacketIdHasher(u64);
impl Hasher for PacketIdHasher {
#[inline(always)]
fn finish(&self) -> u64 {
self.0
}
#[inline(always)]
fn write(&mut self, _: &[u8]) {
panic!("u64 only");
}
#[inline(always)]
fn write_u64(&mut self, i: u64) {
let mut x = self.0.wrapping_add(i);
x ^= x.wrapping_shl(13);
x ^= x.wrapping_shr(7);
x ^= x.wrapping_shl(17);
self.0 = x;
}
}
impl BuildHasher for PacketIdHasher {
type Hasher = Self;
#[inline(always)]
fn build_hasher(&self) -> Self::Hasher {
Self(0)
}
}

View file

@ -46,10 +46,10 @@ pub struct Peer<SI: SystemInterface> {
paths: Mutex<Vec<PeerPath<SI>>>,
// Statistics and times of events.
pub(crate) last_send_time_ticks: AtomicI64,
pub(crate) last_receive_time_ticks: AtomicI64,
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
pub(crate) last_hello_reply_time_ticks: AtomicI64,
pub(crate) last_forward_time_ticks: AtomicI64,
last_forward_time_ticks: AtomicI64,
// Counter for assigning sequential message IDs.
message_id_counter: AtomicU64,
@ -442,7 +442,7 @@ impl<SI: SystemInterface> Peer<SI> {
// because the whole packet is authenticated. Data in the session is not technically secret in a
// cryptographic sense but we encrypt it for privacy and as a defense in depth.
let mut fields = Dictionary::new();
fields.set_u64(session_metadata::INSTANCE_ID, node.instance_id);
fields.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
fields.set_u64(session_metadata::CLOCK, si.time_clock() as u64);
fields.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
let fields = fields.to_bytes();

View file

@ -1,6 +1,6 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use zerotier_core_crypto::aes_gmac_siv::AesGmacSiv;
use zerotier_core_crypto::kbkdf::*;
@ -22,9 +22,6 @@ pub(crate) struct SymmetricSecret {
/// Key used for HMAC extended validation on packets like HELLO.
pub packet_hmac_key: Secret<64>,
/// Key used with ephemeral keying/re-keying.
pub ephemeral_ratchet_key: Secret<64>,
/// Pool of keyed AES-GMAC-SIV engines (pooled to avoid AES re-init every time).
pub aes_gmac_siv: Pool<AesGmacSiv, AesGmacSivPoolFactory>,
}
@ -34,14 +31,12 @@ impl SymmetricSecret {
pub fn new(key: Secret<64>) -> SymmetricSecret {
let hello_private_section_key = zt_kbkdf_hmac_sha384(&key.0, security_constants::KBKDF_KEY_USAGE_LABEL_HELLO_PRIVATE_SECTION);
let packet_hmac_key = zt_kbkdf_hmac_sha512(&key.0, security_constants::KBKDF_KEY_USAGE_LABEL_PACKET_HMAC);
let ephemeral_ratchet_key = zt_kbkdf_hmac_sha512(&key.0, security_constants::KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_KEY);
let aes_factory =
AesGmacSivPoolFactory(zt_kbkdf_hmac_sha384(&key.0[..48], security_constants::KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K0).first_n(), zt_kbkdf_hmac_sha384(&key.0[..48], security_constants::KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K1).first_n());
SymmetricSecret {
key,
hello_private_section_key,
packet_hmac_key,
ephemeral_ratchet_key,
aes_gmac_siv: Pool::new(2, aes_factory),
}
}
@ -50,24 +45,7 @@ impl SymmetricSecret {
/// An ephemeral symmetric secret with usage timers and counters.
pub(crate) struct EphemeralSymmetricSecret {
pub secret: SymmetricSecret,
pub rekey_time_ticks: i64,
pub expire_time_ticks: i64,
pub ratchet_count: u64,
pub encrypt_uses: AtomicUsize,
pub decrypt_uses: AtomicUsize,
pub fips_compliant_exchange: bool,
}
impl EphemeralSymmetricSecret {
#[inline(always)]
pub fn should_rekey(&self, time_ticks: i64) -> bool {
time_ticks >= self.rekey_time_ticks || self.encrypt_uses.load(Ordering::Relaxed).max(self.decrypt_uses.load(Ordering::Relaxed)) >= security_constants::EPHEMERAL_SECRET_REKEY_AFTER_USES
}
#[inline(always)]
pub fn is_expired(&self, time_ticks: i64) -> bool {
time_ticks >= self.expire_time_ticks || self.encrypt_uses.load(Ordering::Relaxed).max(self.decrypt_uses.load(Ordering::Relaxed)) >= security_constants::EPHEMERAL_SECRET_REJECT_AFTER_USES
}
}
pub(crate) struct AesGmacSivPoolFactory(Secret<32>, Secret<32>);

View file

@ -62,7 +62,7 @@ impl WhoisQueue {
todo!()
}
pub(crate) fn service<SI: SystemInterface>(&self, si: &SI, node: &Node<SI>, time_ticks: i64) -> bool {
pub(crate) fn service<SI: SystemInterface>(&self, si: &SI, node: &Node<SI>, time_ticks: i64) {
let mut targets: Vec<Address> = Vec::new();
self.0.lock().retain(|target, qi| {
if qi.retry_count < WHOIS_RETRY_MAX {
@ -78,6 +78,5 @@ impl WhoisQueue {
if !targets.is_empty() {
self.send_whois(node, si, targets.as_slice());
}
true
}
}

View file

@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::error::InvalidFormatError;
use crate::util::buffer::Buffer;
use crate::util::hex::HEX_CHARS;
use crate::util::marshalable::Marshalable;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(transparent)]
@ -43,15 +44,19 @@ impl NetworkId {
pub fn to_u64(&self) -> u64 {
self.0.get()
}
}
impl Marshalable for NetworkId {
const MAX_MARSHAL_SIZE: usize = 8;
#[inline(always)]
pub(crate) fn marshal<const BL: usize>(&self, buf: &mut Buffer<BL>) -> std::io::Result<()> {
fn marshal<const BL: usize>(&self, buf: &mut Buffer<BL>) -> std::io::Result<()> {
buf.append_u64(self.0.get())
}
#[inline(always)]
pub(crate) fn unmarshal<const BL: usize>(buf: &Buffer<BL>, cursor: &mut usize) -> std::io::Result<Option<Self>> {
Ok(Self::from_u64(buf.read_u64(cursor)?))
fn unmarshal<const BL: usize>(buf: &Buffer<BL>, cursor: &mut usize) -> std::io::Result<Self> {
Self::from_u64(buf.read_u64(cursor)?).map_or_else(|| Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "cannot be zero")), |a| Ok(a))
}
}

View file

@ -1065,6 +1065,7 @@ dependencies = [
"clap",
"lazy_static",
"libc",
"log",
"num-traits",
"parking_lot",
"serde",

View file

@ -21,6 +21,7 @@ serde_json = { version = "^1", features = ["std"], default-features = false }
parking_lot = "^0"
lazy_static = "^1"
clap = { version = "^3", features = ["std", "suggestions"], default-features = false }
log = "^0"
[target."cfg(windows)".dependencies]
winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -86,6 +86,7 @@ impl DataDir {
tmp.push(AUTH_TOKEN_POSSIBLE_CHARS.as_bytes()[(next_u32_secure() as usize) % AUTH_TOKEN_POSSIBLE_CHARS.len()] as char);
}
tokio::fs::write(&authtoken_path, tmp.as_bytes()).await?;
assert!(crate::utils::fs_restrict_permissions(&authtoken_path));
*authtoken = tmp;
} else {
*authtoken = String::from_utf8_lossy(authtoken_bytes.unwrap().as_slice()).into();

View file

@ -5,6 +5,8 @@ use std::ptr::{copy_nonoverlapping, null_mut};
use zerotier_network_hypervisor::vl1::InetAddress;
use crate::localinterface::LocalInterface;
#[allow(unused)]
#[inline(always)]
fn s6_addr_as_ptr<A>(a: &A) -> *const A {
@ -13,7 +15,7 @@ fn s6_addr_as_ptr<A>(a: &A) -> *const A {
/// Call supplied function or closure for each physical IP address in the system.
#[cfg(unix)]
pub fn for_each_address<F: FnMut(&InetAddress, &str)>(mut f: F) {
pub fn for_each_address<F: FnMut(&InetAddress, &LocalInterface)>(mut f: F) {
unsafe {
let mut ifa_name = [0_u8; libc::IFNAMSIZ as usize];
let mut ifap: *mut libc::ifaddrs = null_mut();
@ -66,7 +68,7 @@ pub fn for_each_address<F: FnMut(&InetAddress, &str)>(mut f: F) {
if namlen > 0 {
let dev = String::from_utf8_lossy(&ifa_name[0..namlen]);
if dev.len() > 0 {
f(&a, dev.as_ref());
f(&a, &LocalInterface::from_unix_interface_name(dev.as_ref()));
}
}
}
@ -79,12 +81,13 @@ pub fn for_each_address<F: FnMut(&InetAddress, &str)>(mut f: F) {
#[cfg(test)]
mod tests {
use crate::localinterface::LocalInterface;
use zerotier_network_hypervisor::vl1::InetAddress;
#[test]
fn test_getifaddrs() {
println!("starting getifaddrs...");
crate::vnic::getifaddrs::for_each_address(|a: &InetAddress, dev: &str| println!(" {} {}", dev, a.to_string()));
crate::getifaddrs::for_each_address(|a: &InetAddress, interface: &LocalInterface| println!(" {} {}", interface.to_string(), a.to_string()));
println!("done.")
}
}

View file

@ -75,14 +75,21 @@ impl Default for NetworkSettings {
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct GlobalSettings {
/// Primary ZeroTier port that is always bound, default is 9993.
#[serde(rename = "primaryPort")]
pub primary_port: u16,
/// Enable uPnP, NAT-PMP, and other router port mapping technologies?
#[serde(rename = "portMapping")]
pub port_mapping: bool,
/// Interface name prefix blacklist for local bindings (not remote IPs).
#[serde(rename = "interfacePrefixBlacklist")]
pub interface_prefix_blacklist: Vec<String>,
#[serde(rename = "explicitAddresses")]
pub explicit_addresses: Vec<InetAddress>,
/// IP/bits CIDR blacklist for local bindings (not remote IPs).
#[serde(rename = "cidrBlacklist")]
pub cidr_blacklist: Vec<InetAddress>,
}
impl Default for GlobalSettings {
@ -97,7 +104,7 @@ impl Default for GlobalSettings {
primary_port: DEFAULT_PORT,
port_mapping: true,
interface_prefix_blacklist: bl,
explicit_addresses: Vec::new(),
cidr_blacklist: Vec::new(),
}
}
}

View file

@ -0,0 +1,41 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::hash::Hash;
/// Lightweight container for local system network interface names/IDs.
///
/// On *nix systems this will be an interface like 'eth0' stuffed into a u128. On Windows
/// this will be a network device GUID, which is also 128-bit. This will need to be revised
/// if there are OSes out there that use interface names or IDs longer than 16 bytes. The
/// point here is to have something tiny and cheap for the core to store internally.
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
#[repr(transparent)]
pub struct LocalInterface(u128);
impl LocalInterface {
#[cfg(unix)]
pub fn from_unix_interface_name(name: &str) -> Self {
let mut tmp = [0_u8; 16];
let nb = name.as_bytes();
let l = nb.len();
assert!(l <= 16); // do any *nix OSes have device names longer than 16 bytes?
tmp[..l].copy_from_slice(&nb[..l]);
Self(u128::from_le_bytes(tmp))
}
}
impl ToString for LocalInterface {
#[cfg(unix)]
fn to_string(&self) -> String {
let b = self.0.to_le_bytes();
let mut l = 0;
for _ in 0..16 {
if b[l] > 0 {
l += 1;
} else {
break;
}
}
String::from_utf8_lossy(&b[..l]).to_string()
}
}

View file

@ -6,6 +6,7 @@ pub mod exitcode;
pub mod getifaddrs;
pub mod jsonformatter;
pub mod localconfig;
pub mod localinterface;
pub mod service;
pub mod udp;
pub mod utils;
@ -122,7 +123,7 @@ pub struct Flags {
async fn async_main(flags: Flags, global_args: Box<ArgMatches>) -> i32 {
#[allow(unused)]
return match global_args.subcommand() {
match global_args.subcommand() {
Some(("help", _)) => {
print_help();
exitcode::OK
@ -138,8 +139,9 @@ async fn async_main(flags: Flags, global_args: Box<ArgMatches>) -> i32 {
Some(("join", cmd_args)) => todo!(),
Some(("leave", cmd_args)) => todo!(),
Some(("service", _)) => {
drop(global_args); // free unnecessary heap
assert!(service::Service::new(flags.base_path.as_str()).await.is_ok());
drop(global_args); // free unnecessary heap before starting service as we're done with CLI args
assert!(service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await.is_ok());
exitcode::OK
}
Some(("identity", cmd_args)) => todo!(),
Some(("rootset", cmd_args)) => cli::rootset::cmd(flags, cmd_args).await,
@ -147,7 +149,7 @@ async fn async_main(flags: Flags, global_args: Box<ArgMatches>) -> i32 {
eprintln!("Invalid command line. Use 'help' for help.");
exitcode::ERR_USAGE
}
};
}
}
fn main() {

View file

@ -1,72 +1,153 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::hash::Hash;
use std::num::NonZeroI64;
use std::sync::atomic::AtomicUsize;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use tokio::net::UdpSocket;
use zerotier_network_hypervisor::vl1::*;
use zerotier_network_hypervisor::vl2::*;
use zerotier_network_hypervisor::*;
use zerotier_core_crypto::random;
use tokio::time::Duration;
use crate::datadir::DataDir;
use crate::udp::BoundUdpSocket;
use crate::localinterface::LocalInterface;
use crate::udp::{BoundUdpPort, BoundUdpSocket};
use crate::utils::{ms_monotonic, ms_since_epoch};
pub type DynamicError = Box<dyn Error>;
const UDP_UPDATE_BINDINGS_INTERVAL_MS: Duration = Duration::from_millis(2500);
/// ZeroTier system service, which presents virtual networks as VPN connections.
pub struct Service {
udp_binding_task: tokio::task::JoinHandle<()>,
core_background_service_task: tokio::task::JoinHandle<()>,
internal: Arc<ServiceImpl>,
}
struct ServiceImpl {
pub rt: tokio::runtime::Handle,
pub data: DataDir,
pub local_socket_unique_id_counter: AtomicUsize,
pub udp_sockets: parking_lot::RwLock<HashMap<u16, Vec<Arc<BoundUdpSocket>>>>,
pub core: Option<NetworkHypervisor<Self>>,
pub udp_sockets: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>,
pub num_listeners_per_socket: usize,
_core: Option<NetworkHypervisor<Self>>,
}
impl Drop for Service {
fn drop(&mut self) {
self.internal.rt.block_on(async {
// Kill all background tasks associated with this service.
self.udp_binding_task.abort();
self.core_background_service_task.abort();
// Wait for all tasks to actually stop.
let _ = self.udp_binding_task.await;
let _ = self.core_background_service_task.await;
// Drop all bound sockets since these can hold circular Arc<> references to 'internal'.
self.internal.udp_sockets.write().await.clear();
});
}
}
impl Service {
pub async fn new(base_path: &str) -> Result<Self, DynamicError> {
let mut svc = Self {
rt: tokio::runtime::Handle::current(),
pub async fn new<P: AsRef<Path>>(rt: tokio::runtime::Handle, base_path: P, auto_upgrade_identity: bool) -> Result<Self, Box<dyn Error>> {
let mut si = ServiceImpl {
rt,
data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?,
local_socket_unique_id_counter: AtomicUsize::new(1),
udp_sockets: parking_lot::RwLock::new(HashMap::with_capacity(4)),
core: None,
udp_sockets: tokio::sync::RwLock::new(HashMap::with_capacity(4)),
num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(),
_core: None,
};
let _ = svc.core.insert(NetworkHypervisor::new(&svc, true).map_err(|e| Box::new(e))?);
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity)?);
let si = Arc::new(si);
let config = svc.data.config().await;
Ok(svc)
let (si1, si2) = (si.clone(), si.clone());
Ok(Self {
udp_binding_task: si.rt.spawn(si1.udp_binding_task_main()),
core_background_service_task: si.rt.spawn(si2.core_background_service_task_main()),
internal: si,
})
}
}
/// Local socket wrapper implementing equality and hash in terms of an arbitrary unique ID.
#[derive(Clone)]
struct LocalSocket(Weak<BoundUdpSocket>, usize);
impl PartialEq for LocalSocket {
impl ServiceImpl {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.1 == other.1
fn core(&self) -> &NetworkHypervisor<ServiceImpl> {
debug_assert!(self._core.is_some());
unsafe { self._core.as_ref().unwrap_unchecked() }
}
async fn update_bindings_for_port(self: &Arc<Self>, port: u16, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> {
let mut udp_sockets = self.udp_sockets.write().await;
let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port));
let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist);
if bp.sockets.is_empty() {
return Some(errors);
}
drop(udp_sockets); // release lock
for ns in new_sockets.iter() {
// We start a task for each CPU core. Tokio multiplexes but since each packet takes a bit of CPU
// to parse, decrypt, etc. we want to be able to saturate the CPU for any given socket to virtual
// network path. The alternative would be to use MPMC channels but that would almost certainly be
// a lot slower as it would involve more sync/atomic bottlenecks and probably extra malloc/free.
let mut kill_on_drop = ns.kill_on_drop.lock();
for _ in 0..self.num_listeners_per_socket {
let self2 = self.clone();
let socket = ns.socket.clone();
let interface = ns.interface.clone();
let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst));
kill_on_drop.push(self.rt.spawn(async move {
let core = self2.core();
loop {
let mut buf = core.get_packet_buffer();
if let Ok((bytes, source)) = socket.recv_from(unsafe { buf.entire_buffer_mut() }).await {
unsafe { buf.set_size_unchecked(bytes) };
core.handle_incoming_physical_packet(&self2, &Endpoint::IpUdp(InetAddress::from(source)), &local_socket, &interface, buf);
} else {
break;
}
}
}));
}
}
return None;
}
async fn udp_binding_task_main(self: Arc<Self>) {
loop {
let config = self.data.config().await;
if let Some(errors) = self.update_bindings_for_port(config.settings.primary_port, &config.settings.interface_prefix_blacklist, &config.settings.cidr_blacklist).await {
for e in errors.iter() {
println!("BIND ERROR: {} {} {}", e.0.to_string(), e.1.to_string(), e.2.to_string());
}
// TODO: report errors properly
}
tokio::time::sleep(UDP_UPDATE_BINDINGS_INTERVAL_MS).await;
}
}
async fn core_background_service_task_main(self: Arc<Self>) {
tokio::time::sleep(Duration::from_secs(1)).await;
loop {
tokio::time::sleep(self.core().do_background_tasks(&self)).await;
}
}
}
impl Eq for LocalSocket {}
impl Hash for LocalSocket {
#[inline(always)]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.1.hash(state)
}
}
impl SystemInterface for Service {
impl SystemInterface for ServiceImpl {
type LocalSocket = crate::service::LocalSocket;
type LocalInterface = String;
type LocalInterface = crate::localinterface::LocalInterface;
fn event_node_is_up(&self) {}
@ -91,7 +172,55 @@ impl SystemInterface for Service {
}
fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool {
todo!()
match endpoint {
Endpoint::IpUdp(address) => {
// This is the fast path -- the socket is known to the core so just send it.
if let Some(s) = local_socket {
if let Some(s) = s.0.upgrade() {
return s.send_sync_nonblock(&self.rt, address, data, packet_ttl);
} else {
return false;
}
}
// Otherwise we try to send from one socket on every interface or from the specified interface.
// This path only happens when the core is trying new endpoints. The fast path is for most packets.
return self.rt.block_on(async {
let sockets = self.udp_sockets.read().await;
if !sockets.is_empty() {
if let Some(specific_interface) = local_interface {
for (_, p) in sockets.iter() {
for s in p.sockets.iter() {
if s.interface.eq(specific_interface) {
if s.send_async(&self.rt, address, data, packet_ttl).await {
return true;
}
}
}
}
} else {
let bound_ports: Vec<&u16> = sockets.keys().collect();
let mut sent_on_interfaces = HashSet::with_capacity(4);
let rn = random::xorshift64_random() as usize;
for i in 0..bound_ports.len() {
let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap();
for s in p.sockets.iter() {
if !sent_on_interfaces.contains(&s.interface) {
if s.send_async(&self.rt, address, data, packet_ttl).await {
sent_on_interfaces.insert(s.interface.clone());
}
}
}
}
return !sent_on_interfaces.is_empty();
}
}
return false;
});
}
_ => {}
}
return false;
}
fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool {
@ -113,6 +242,54 @@ impl SystemInterface for Service {
}
}
impl SwitchInterface for Service {}
impl SwitchInterface for ServiceImpl {}
impl Interface for Service {}
impl Interface for ServiceImpl {}
/// Local socket wrapper to provide to the core.
///
/// This implements very fast hash and equality in terms of an arbitrary unique ID assigned at
/// construction and holds a weak reference to the bound socket so dead sockets will silently
/// cease to exist or work. This also means that this code can check the weak count to determine
/// if the core is currently holding/using a socket for any reason.
#[derive(Clone)]
pub struct LocalSocket(Weak<BoundUdpSocket>, usize);
impl LocalSocket {
/// Returns true if the wrapped socket appears to be in use by the core.
#[inline(always)]
pub fn in_use(&self) -> bool {
self.0.weak_count() > 0
}
#[inline(always)]
pub fn socket(&self) -> Option<Arc<BoundUdpSocket>> {
self.0.upgrade()
}
}
impl PartialEq for LocalSocket {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
self.1 == other.1
}
}
impl Eq for LocalSocket {}
impl Hash for LocalSocket {
#[inline(always)]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.1.hash(state)
}
}
impl ToString for LocalSocket {
fn to_string(&self) -> String {
if let Some(s) = self.0.upgrade() {
s.address.to_string()
} else {
"(closed socket)".into()
}
}
}

View file

@ -1,83 +1,239 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::num::NonZeroI64;
use std::collections::HashMap;
#[allow(unused_imports)]
use std::mem::{size_of, transmute, MaybeUninit};
#[allow(unused_imports)]
use std::net::SocketAddr;
#[allow(unused_imports)]
use std::os::raw::*;
#[allow(unused_imports)]
use std::ptr::{null, null_mut};
use std::sync::Arc;
#[cfg(unix)]
use std::os::unix::io::{FromRawFd, RawFd};
use lazy_static::lazy_static;
use crate::getifaddrs;
use crate::localinterface::LocalInterface;
#[allow(unused_imports)]
use num_traits::AsPrimitive;
use crate::getifaddrs;
use zerotier_network_hypervisor::vl1::{InetAddress, IpScope};
use zerotier_network_hypervisor::vl1::inetaddress::{InetAddress, IpScope};
/// A local port to which one or more UDP sockets is bound.
///
/// To bind a port we must bind sockets to each interface/IP pair directly. Sockets must
/// be "hard" bound to the interface so default route override can work.
pub struct BoundUdpPort {
pub sockets: Vec<Arc<BoundUdpSocket>>,
pub port: u16,
}
/// A locally bound UDP socket.
/// A socket bound to a specific interface and IP.
pub struct BoundUdpSocket {
/// Locally bound address.
/// Local IP address to which this socket is bound.
pub address: InetAddress,
/// Locally bound (to device) socket.
pub socket: tokio::net::UdpSocket,
/// Local interface device name or other unique identifier (OS-specific).
pub interface: String,
/// Raw socket FD, which only remains valid as long as 'socket' exists.
pub fd: RawFd,
/// Monotonic time of last activity.
pub last_activity_time_ticks: i64,
/// High-level async socket, but UDP also supports non-blocking sync send.
pub socket: Arc<tokio::net::UdpSocket>,
/// Local interface on which socket appears.
pub interface: LocalInterface,
/// Add tasks here that should be aborted when this socket is closed.
pub kill_on_drop: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
fd: RawFd,
}
impl Drop for BoundUdpSocket {
fn drop(&mut self) {
for t in self.kill_on_drop.lock().drain(..) {
t.abort();
}
}
}
impl BoundUdpSocket {
/// Update 'sockets' by adding any missing local bindings and removing any that are no longer valid.
#[cfg(unix)]
#[inline(always)]
fn set_ttl(&self, packet_ttl: u8) {
let ttl = packet_ttl as c_int;
unsafe { libc::setsockopt(self.fd.as_(), libc::IPPROTO_IP.as_(), libc::IP_TOS.as_(), (&ttl as *const c_int).cast(), std::mem::size_of::<c_int>().as_()) };
}
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
let mut ok = false;
if dest.family() == self.address.family() {
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(packet_ttl);
}
unsafe {
if b.len() == 1 {
let bb = *b.get_unchecked(0);
ok = libc::sendto(self.fd.as_(), bb.as_ptr().cast(), bb.len().as_(), 0, transmute(dest as *const InetAddress), size_of::<InetAddress>().as_()) > 0;
} else {
let mut iov: [libc::iovec; 16] = MaybeUninit::uninit().assume_init();
assert!(b.len() <= iov.len());
for i in 0..b.len() {
let bb = *b.get_unchecked(i);
let ii = iov.get_unchecked_mut(i);
ii.iov_base = transmute(bb.as_ptr());
ii.iov_len = bb.len().as_();
}
let msghdr = libc::msghdr {
msg_name: transmute(dest as *const InetAddress),
msg_namelen: size_of::<InetAddress>().as_(),
msg_iov: iov.as_mut_ptr(),
msg_iovlen: b.len().as_(),
msg_control: null_mut(),
msg_controllen: 0,
msg_flags: 0,
};
ok = libc::sendmsg(self.fd.as_(), &msghdr, 0) > 0;
}
}
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(0xff);
}
}
ok
}
#[cfg(not(any(target_os = "macos", target_os = "freebsd")))]
pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
let mut ok = false;
if dest.family() == self.address.family() {
let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() };
let data = if b.len() == 1 {
*unsafe { b.get_unchecked(0) }
} else {
let mut p = 0;
for bb in b.iter() {
let pp = p + bb.len();
if pp < 16384 {
tmp[p..pp].copy_from_slice(*bb);
p = pp;
} else {
return false;
}
}
&tmp[..p]
};
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(packet_ttl);
}
ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok();
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(0xff);
}
}
ok
}
pub async fn send_async(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
let mut ok = false;
if dest.family() == self.address.family() {
let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() };
let data = if b.len() == 1 {
*unsafe { b.get_unchecked(0) }
} else {
let mut p = 0;
for bb in b.iter() {
let pp = p + bb.len();
if pp < 16384 {
tmp[p..pp].copy_from_slice(*bb);
p = pp;
} else {
return false;
}
}
&tmp[..p]
};
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(packet_ttl);
}
let sa: SocketAddr = dest.try_into().unwrap();
ok = self.socket.send_to(data, sa).await.is_ok();
if packet_ttl > 0 && dest.is_ipv4() {
self.set_ttl(0xff);
}
}
ok
}
}
impl BoundUdpPort {
/// Create a new port binding.
///
/// You must call update_bindings() after this to actually bind to system interfaces.
pub fn new(port: u16) -> Self {
Self { sockets: Vec::new(), port }
}
/// Synchronize bindings with devices and IPs in system.
///
/// Any device or local IP within any of the supplied blacklists is ignored. Multicast or loopback addresses are
/// also ignored. All errors encountered are returned.
/// also ignored.
///
/// This should always be called on the same port for the same socket collection. Calling on the same 'sockets'
/// with different ports will lead to redundant or missed bindings.
///
/// We must bind directly to each device/address pair for each port so default route override can work.
fn update_bindings_for_port(sockets: &mut Vec<Arc<BoundUdpSocket>>, port: u16, device_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Vec<std::io::Error> {
/// The caller can check the 'sockets' member variable after calling to determine which if any bindings were
/// successful. Any errors that occurred are returned as tuples of (interface, address, error). The second vector
/// returned contains newly bound sockets.
pub fn update_bindings(&mut self, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> (Vec<(LocalInterface, InetAddress, std::io::Error)>, Vec<Arc<BoundUdpSocket>>) {
let mut existing_bindings: HashMap<LocalInterface, HashMap<InetAddress, Arc<BoundUdpSocket>>> = HashMap::with_capacity(4);
for s in self.sockets.drain(..) {
existing_bindings.entry(s.interface.clone()).or_insert_with(|| HashMap::with_capacity(4)).insert(s.address.clone(), s);
}
let mut errors = Vec::new();
let mut existing_bind_points: HashMap<String, Vec<InetAddress>> = HashMap::with_capacity(id_assignment_state.devices.len() + 1);
let now = crate::utils::ms_monotonic();
getifaddrs::for_each_address(|address, device| {
let mut new_sockets = Vec::new();
getifaddrs::for_each_address(|address, interface| {
let interface_str = interface.to_string();
if address.is_ip()
&& matches!(address.scope(), IpScope::Global | IpScope::PseudoPrivate | IpScope::Private | IpScope::Shared)
&& !device_prefix_blacklist.iter().any(|pfx| device.starts_with(pfx.as_str()))
&& !interface_prefix_blacklist.iter().any(|pfx| interface_str.starts_with(pfx.as_str()))
&& !cidr_blacklist.iter().any(|r| address.is_within(r))
{
existing_bind_points.entry(device.to_string()).or_default().push(address.clone());
if !sockets.iter().any(|_, s| s.address == address || s.local_device_id == did) {
let s = unsafe { bind_udp_to_device(device, address) };
let mut found = false;
if let Some(byaddr) = existing_bindings.get(interface) {
if let Some(socket) = byaddr.get(address) {
found = true;
self.sockets.push(socket.clone());
}
}
if !found {
let mut addr_with_port = address.clone();
addr_with_port.set_port(self.port);
let s = unsafe { bind_udp_to_device(interface_str.as_str(), &addr_with_port) };
if s.is_ok() {
let fd = s.unwrap();
let s = tokio::net::UdpSocket::from_std(unsafe { std::net::UdpSocket::from_raw_fd(fd) });
if s.is_ok() {
id_assignment_state.socket_id_counter += 1;
let lsid = NonZeroI64::new(id_assignment_state.socket_id_counter).unwrap();
sockets.push(Arc::new(BoundUdpSocket {
address: address.clone(),
socket: s.unwrap(),
interface: device.to_string(),
let s = Arc::new(BoundUdpSocket {
address: addr_with_port,
socket: Arc::new(s.unwrap()),
kill_on_drop: parking_lot::Mutex::new(Vec::new()),
interface: interface.clone(),
fd,
last_activity_time_ticks: now,
}));
});
self.sockets.push(s.clone());
new_sockets.push(s);
} else {
errors.push(s.err().unwrap());
errors.push((interface.clone(), addr_with_port, s.err().unwrap()));
}
} else {
errors.push(std::io::Error::new(std::io::ErrorKind::AddrInUse, s.err().unwrap()));
errors.push((interface.clone(), addr_with_port, std::io::Error::new(std::io::ErrorKind::Other, s.err().unwrap())));
}
}
}
});
sockets.retain(|s| existing_bind_points.get(&s.local_interface).map_or(false, |addr_list| addr_list.contains(&s.address)));
errors
(errors, new_sockets)
}
}
@ -85,8 +241,8 @@ impl BoundUdpSocket {
#[cfg(unix)]
unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result<RawFd, &'static str> {
let (af, sa_len) = match address.family() {
InetAddressFamily::IPv4 => (libc::AF_INET, std::mem::size_of::<libc::sockaddr_in>().as_()),
InetAddressFamily::IPv6 => (libc::AF_INET6, std::mem::size_of::<libc::sockaddr_in6>().as_()),
InetAddress::AF_INET => (libc::AF_INET, std::mem::size_of::<libc::sockaddr_in>().as_()),
InetAddress::AF_INET6 => (libc::AF_INET6, std::mem::size_of::<libc::sockaddr_in6>().as_()),
_ => {
return Err("unrecognized address family");
}
@ -128,7 +284,7 @@ unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result
#[cfg(not(target_os = "linux"))]
{
fl = 0;
libc::setsockopt(s, libc::IPPROTO_IP.as_(), libc::IP_DF.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::<c_int>().as_());
libc::setsockopt(s, libc::IPPROTO_IP.as_(), libc::IP_DONTFRAG.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::<c_int>().as_());
}
#[cfg(target_os = "linux")]
{
@ -143,14 +299,14 @@ unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result
}
fl = 1048576;
while fl >= 131072 {
while fl >= 65536 {
if libc::setsockopt(s, libc::SOL_SOCKET.as_(), libc::SO_RCVBUF.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::<c_int>().as_()) == 0 {
break;
}
fl -= 65536;
}
fl = 1048576;
while fl >= 131072 {
while fl >= 65536 {
if libc::setsockopt(s, libc::SOL_SOCKET.as_(), libc::SO_SNDBUF.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::<c_int>().as_()) == 0 {
break;
}