More work on service itself.

This commit is contained in:
Adam Ierymenko 2021-01-20 00:02:39 -05:00
parent 346bb7cf99
commit 896d75fe86
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
12 changed files with 1323 additions and 58 deletions

View file

@ -836,4 +836,9 @@ enum ZT_InetAddress_IpScope ZT_InetAddress_ipScope(const ZT_InetAddress *ia)
/********************************************************************************************************************/
uint64_t ZT_random()
{
return ZeroTier::Utils::random();
}
} // extern "C"

View file

@ -3023,6 +3023,7 @@ ZT_SDK_API const int ZT_AF_INET,ZT_AF_INET6;
/* ---------------------------------------------------------------------------------------------------------------- */
ZT_SDK_API uint64_t ZT_random();
#ifdef __cplusplus
}

View file

@ -4,6 +4,7 @@ project(zt_osdep)
set(src
Arp.cpp
EthernetTap.cpp
ManagedRoute.cpp
NeighborDiscovery.cpp
OSUtils.cpp
)
@ -12,6 +13,7 @@ set(headers
Arp.hpp
BlockingQueue.hpp
EthernetTap.hpp
ManagedRoute.hpp
OSUtils.hpp
Thread.hpp
)

View file

@ -11,7 +11,7 @@
*/
/****/
#include "../node/Constants.hpp"
#include "../core/Constants.hpp"
#include <stdint.h>
#include <stdio.h>

View file

@ -12,7 +12,6 @@
/****/
use std::os::raw::c_void;
use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut};
use crate::bindings::capi as ztcore;
@ -23,12 +22,12 @@ use crate::bindings::capi as ztcore;
/// packet data is passed into and out of the core.
pub struct Buffer {
pub(crate) zt_core_buf: *mut u8,
pub(crate) data_size: u32
pub(crate) data_size: usize,
}
impl Buffer {
/// Maximum capacity of a ZeroTier reusable buffer.
pub const CAPACITY: u32 = ztcore::ZT_BUF_SIZE as u32;
pub const CAPACITY: usize = ztcore::ZT_BUF_SIZE as usize;
/// Obtain a new buffer from the core and set the size of its data to CAPACITY.
/// The contents of the buffer are not defined.
@ -38,16 +37,16 @@ impl Buffer {
if b.is_null() {
panic!("out of memory calling ZT_getBuffer()");
}
return Buffer {
Buffer {
zt_core_buf: b,
data_size: ztcore::ZT_BUF_SIZE
};
data_size: ztcore::ZT_BUF_SIZE as usize
}
}
/// Get the current size of the data held by this buffer.
/// Initially this is equal to CAPACITY.
#[inline(always)]
pub fn len(&self) -> u32 {
pub fn len(&self) -> usize {
self.data_size
}
@ -65,29 +64,9 @@ impl Buffer {
/// setting it to a value larger than CAPACITY will place the buffer into
/// an invalid state.
#[inline(always)]
pub unsafe fn set_len(&mut self, s: u32) {
pub unsafe fn set_len(&mut self, s: usize) {
self.data_size = s;
}
/// Get a slice that points to this buffer's data. This is unsafe because
/// the returned slice will be invalid if set_len() has been called with a
/// value higher than CAPACITY or if this has been consumed by the ZeroTier
/// core. The latter case is handled automatically in node.rs though, so it
/// is not something you generally have to worry about.
#[inline(always)]
pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
return &mut *slice_from_raw_parts_mut(self.zt_core_buf, self.data_size as usize);
}
/// Get a slice that points to this buffer's data. This is unsafe because
/// the returned slice will be invalid if set_len() has been called with a
/// value higher than CAPACITY or if this has been consumed by the ZeroTier
/// core. The latter case is handled automatically in node.rs though, so it
/// is not something you generally have to worry about.
#[inline(always)]
pub unsafe fn as_slice(&mut self) -> &[u8] {
return &*slice_from_raw_parts(self.zt_core_buf, self.data_size as usize);
}
}
impl Drop for Buffer {

View file

@ -165,6 +165,14 @@ pub fn now() -> i64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as i64
}
/// Get a random 64-bit integer using the non-cryptographic PRNG in the ZeroTier core.
#[inline(always)]
pub fn random() -> u64 {
unsafe {
return ztcore::ZT_random();
}
}
/// The CStr stuff is cumbersome, so this is an easier to use function to turn a C string into a String.
/// This returns an empty string on a null pointer or invalid UTF-8. It's unsafe because it can crash if
/// the string is not zero-terminated. A size limit can be passed in if available to reduce this risk, or

View file

@ -206,6 +206,8 @@ pub struct VirtualNetworkRoute {
pub metric: u16
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Serialize, Deserialize, PartialEq, Eq)]
pub struct VirtualNetworkConfig {
pub nwid: NetworkId,

File diff suppressed because it is too large Load diff

View file

@ -9,6 +9,10 @@ edition = "2018"
[dependencies]
zerotier-core = { path = "../rust-zerotier-core" }
num_cpus = "1.13"
tokio = { version = "1", features = ["full"] }
warp = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
[target."cfg(unix)".dependencies]
libc = "0.2.82"

View file

@ -2,13 +2,26 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use zerotier_core::{Buffer, InetAddress, InetAddressFamily};
//
// A very low-level fast UDP socket that uses thread-per-core semantics to
// achieve maximum possible throughput. This will spawn a lot of threads but
// these threads will be inactive unless packets are being received with them.
//
// On most OSes this is by far the fastest way to handle incoming UDP except
// for bypassing the kernel's TCP/IP stack entirely.
//
#[cfg(windows)]
pub type FastUDPRawOsSocket = winapi::um::winsock2::SOCKET;
use winapi::um::winsock2 as winsock2;
#[cfg(windows)]
pub type FastUDPRawOsSocket = winsock2::SOCKET;
#[cfg(unix)]
pub type FastUDPRawOsSocket = libc::c_int;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// bind_udp_socket() implementations for each platform
#[cfg(target_os = "macos")]
fn bind_udp_socket(_: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket, &'static str> {
@ -84,20 +97,6 @@ fn bind_udp_socket(_: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket,
fl -= 65536;
}
/*
// Bind socket directly to device to allow ZeroTier to work if it overrides the default route.
if device_name.as_bytes().len() > 0 {
let namidx = libc::if_nametoindex(device_name.as_ptr()) as libc::c_int;
if namidx != 0 {
if libc::setsockopt(s, libc::IPPROTO_IP, 25 /* IP_BOUND_IF */, (&namidx as *const libc::c_int).cast(), std::mem::size_of_val(&namidx) as libc::socklen_t) != 0 {
//libc::perror(std::ptr::null());
libc::close(s);
return Err("bind to interface failed");
}
}
}
*/
if libc::bind(s, (address as *const InetAddress).cast(), sa_len) != 0 {
//libc::perror(std::ptr::null());
libc::close(s);
@ -110,6 +109,8 @@ fn bind_udp_socket(_: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket,
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// Handler for incoming packets received by a FastUDPSocket.
/// Note that this may be called concurrently from any number of threads.
pub trait FastUDPSocketPacketHandler {
fn incoming_udp_packet(&self, raw_socket: &FastUDPRawOsSocket, from_adddress: &InetAddress, data: Buffer);
}
@ -125,14 +126,14 @@ pub struct FastUDPSocket<H: FastUDPSocketPacketHandler + Send + Sync + 'static>
#[cfg(unix)]
#[inline(always)]
pub fn fast_udp_socket_sendto(socket: &FastUDPRawOsSocket, to_address: &InetAddress, data: &[u8], packet_ttl: i32) {
pub fn fast_udp_socket_sendto(socket: &FastUDPRawOsSocket, to_address: &InetAddress, data: *const u8, len: usize, packet_ttl: i32) {
unsafe {
if packet_ttl <= 0 {
libc::sendto(*socket, data.as_ptr().cast(), data.len() as libc::size_t, 0, (to_address as *const InetAddress).cast(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
libc::sendto(*socket, data.cast(), len as libc::size_t, 0, (to_address as *const InetAddress).cast(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
} else {
let mut ttl = packet_ttl as libc::c_int;
libc::setsockopt(*socket, libc::IPPROTO_IP, libc::IP_TTL, (&mut ttl as *mut libc::c_int).cast(), std::mem::size_of::<libc::c_int>() as libc::socklen_t);
libc::sendto(*socket, data.as_ptr().cast(), data.len() as libc::size_t, 0, (to_address as *const InetAddress).cast(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
libc::sendto(*socket, data.cast(), len as libc::size_t, 0, (to_address as *const InetAddress).cast(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
ttl = 255;
libc::setsockopt(*socket, libc::IPPROTO_IP, libc::IP_TTL, (&mut ttl as *mut libc::c_int).cast(), std::mem::size_of::<libc::c_int>() as libc::socklen_t);
}
@ -160,7 +161,7 @@ fn fast_udp_socket_recvfrom(socket: &FastUDPRawOsSocket, buf: &mut Buffer, from_
static mut SOCKET_SPIN_INT: usize = 0;
impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
pub fn new(device_name: &str, address: &InetAddress, handler: &Arc<H>) -> Result<FastUDPSocket<H>, &'static str> {
pub fn new(device_name: &str, address: &InetAddress, handler: &Arc<H>) -> Result<FastUDPSocket<H>, String> {
let thread_count = num_cpus::get();
let mut s = FastUDPSocket{
@ -171,6 +172,7 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
bind_address: address.clone()
};
let mut bind_failed_reason: &'static str = "";
for _ in 0..thread_count {
let thread_socket = bind_udp_socket(device_name, address);
if thread_socket.is_ok() {
@ -197,11 +199,13 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
}
}
}));
} else {
bind_failed_reason = thread_socket.err().unwrap();
}
}
if s.sockets.is_empty() {
return Err("unable to bind to address for IPv4 or IPv6");
return Err(format!("unable to bind to address for IPv4 or IPv6 ({})", bind_failed_reason));
}
Ok(s)
@ -211,15 +215,15 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
/// This actually picks a thread's socket and sends from it. Since all
/// are bound to the same IP:port which one is chosen doesn't matter.
/// Sockets are thread safe.
pub fn send(&self, to_address: &InetAddress, data: &[u8], packet_ttl: i32) {
#[inline(always)]
pub fn send(&self, to_address: &InetAddress, data: *const u8, len: usize, packet_ttl: i32) {
let mut i;
unsafe {
i = SOCKET_SPIN_INT;
SOCKET_SPIN_INT = i + 1;
i %= self.sockets.len();
}
let s = self.sockets.get(i).unwrap();
fast_udp_socket_sendto(s, to_address, data, packet_ttl);
i %= self.sockets.len();
fast_udp_socket_sendto(self.sockets.get(i).unwrap(), to_address, data, len, packet_ttl);
}
/// Get the number of threads this socket is currently running.
@ -245,17 +249,17 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> Drop for FastUDPSock
self.thread_run.store(false, Ordering::Relaxed);
for s in self.sockets.iter() {
unsafe {
libc::sendto(*s as libc::c_int, tmp.as_ptr().cast(), 0, 0, (&self.bind_address as *const InetAddress).cast(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
libc::sendto(*s, tmp.as_ptr().cast(), 0, 0, (&self.bind_address as *const InetAddress).cast(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
}
}
for s in self.sockets.iter() {
unsafe {
libc::shutdown(*s as libc::c_int, libc::SHUT_RDWR);
libc::shutdown(*s, libc::SHUT_RDWR);
}
}
for s in self.sockets.iter() {
unsafe {
libc::close(*s as libc::c_int);
libc::close(*s);
}
}
while !self.threads.is_empty() {
@ -312,8 +316,8 @@ mod tests {
let data_bytes = [0_u8; 1024];
loop {
s1.send(&ba2, &data_bytes, 0);
s2.send(&ba1, &data_bytes, 0);
s1.send(&ba2, data_bytes.as_ptr(), data_bytes.len(), 0);
s2.send(&ba1, data_bytes.as_ptr(), data_bytes.len(), 0);
if h1.cnt.load(Ordering::Relaxed) > 10000 && h2.cnt.load(Ordering::Relaxed) > 10000 {
break;
}

View file

@ -0,0 +1,173 @@
use std::collections::BTreeMap;
use zerotier_core::{InetAddress, Address, NetworkId};
use serde::{Deserialize, Serialize};
pub const UNASSIGNED_PRIVILEGED_PORTS: [u16; 299] = [
4,
6,
8,
10,
12,
14,
15,
16,
26,
28,
30,
32,
34,
36,
40,
60,
269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279,
285,
288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307,
323, 324, 325, 326, 327, 328, 329, 330, 331, 332,
334, 335, 336, 337, 338, 339, 340, 341, 342, 343,
703,
708,
713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727, 728,
732, 733, 734, 735, 736, 737, 738, 739, 740,
743,
745, 746,
755, 756,
766,
768,
778, 779,
781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799,
802, 803, 804, 805, 806, 807, 808, 809,
811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 822, 823, 824, 825, 826, 827,
834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846,
849, 850, 851, 852, 853, 854, 855, 856, 857, 858, 859,
862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872,
874, 875, 876, 877, 878, 879, 880, 881, 882, 883, 884, 885,
889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899,
904, 905, 906, 907, 908, 909, 910, 911,
914, 915, 916, 917, 918, 919, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988,
1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009,
1023,
];
#[derive(Serialize, Deserialize)]
#[serde(default)]
pub struct LocalConfigPhysicalPathConfig {
blacklist: bool
}
#[derive(Serialize, Deserialize)]
#[serde(default)]
pub struct LocalConfigVirtualConfig {
#[serde(rename = "try")]
try_: Vec<InetAddress>
}
#[derive(Serialize, Deserialize)]
#[serde(default)]
pub struct LocalConfigNetworkSettings {
#[serde(rename = "allowManagedIPs")]
allow_managed_ips: bool,
#[serde(rename = "allowGlobalIPs")]
allow_global_ips: bool,
#[serde(rename = "allowManagedRoutes")]
allow_managed_routes: bool,
#[serde(rename = "allowGlobalRoutes")]
allow_global_routes: bool,
#[serde(rename = "allowDefaultRouteOverride")]
allow_default_route_override: bool,
}
#[derive(Serialize, Deserialize)]
#[serde(default)]
pub struct LocalConfigSettings {
#[serde(rename = "primaryPort")]
primary_port: u16,
#[serde(rename = "secondaryPort")]
secondary_port: Option<u16>,
#[serde(rename = "autoPortSearch")]
auto_port_search: bool,
#[serde(rename = "portMapping")]
port_mapping: bool,
#[serde(rename = "logSizeMax")]
log_size_max: usize,
#[serde(rename = "interfacePrefixBlacklist")]
interface_prefix_blacklist: Vec<String>,
#[serde(rename = "explicitAddresses")]
explicit_addresses: Vec<InetAddress>,
}
#[derive(Serialize, Deserialize)]
#[serde(default)]
pub struct LocalConfig {
physical: BTreeMap<InetAddress, LocalConfigPhysicalPathConfig>,
#[serde(rename = "virtual")]
virtual_: BTreeMap<Address, LocalConfigVirtualConfig>,
network: BTreeMap<NetworkId, LocalConfigNetworkSettings>,
settings: LocalConfigSettings,
}
impl Default for LocalConfigPhysicalPathConfig {
fn default() -> Self {
LocalConfigPhysicalPathConfig {
blacklist: false
}
}
}
impl Default for LocalConfigVirtualConfig {
fn default() -> Self {
LocalConfigVirtualConfig {
try_: Vec::new()
}
}
}
impl Default for LocalConfigNetworkSettings {
fn default() -> Self {
LocalConfigNetworkSettings {
allow_managed_ips: true,
allow_global_ips: false,
allow_managed_routes: true,
allow_global_routes: false,
allow_default_route_override: false
}
}
}
impl LocalConfigSettings {
#[cfg(target_os = "macos")]
const DEFAULT_PREFIX_BLACKLIST: [&str; 7] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth"];
#[cfg(target_os = "linux")]
const DEFAULT_PREFIX_BLACKLIST: [&str; 5] = ["lo", "tun", "tap", "ipsec", "zt"];
}
impl Default for LocalConfigSettings {
fn default() -> Self {
let mut bl: Vec<String> = Vec::new();
bl.reserve(LocalConfigSettings::DEFAULT_PREFIX_BLACKLIST.len());
for n in LocalConfigSettings::DEFAULT_PREFIX_BLACKLIST.iter() {
bl.push(String::from(*n));
}
LocalConfigSettings {
primary_port: zerotier_core::DEFAULT_PORT,
secondary_port: Some(293), // this is one of UNASSIGNED_PRIVILEGED_PORTS that we will default to
auto_port_search: true,
port_mapping: true,
log_size_max: 16777216,
interface_prefix_blacklist: bl,
explicit_addresses: Vec::new()
}
}
}
impl Default for LocalConfig {
fn default() -> Self {
LocalConfig {
physical: BTreeMap::new(),
virtual_: BTreeMap::new(),
network: BTreeMap::new(),
settings: LocalConfigSettings::default()
}
}
}

View file

@ -1,5 +1,11 @@
mod fastudp;
mod localconfig;
fn main() {
println!("Hello, world!");
let tokio_rt = tokio::runtime::Runtime::new().unwrap();
tokio_rt.block_on(async {
// TODO: init warp http server and anything else using tokio
});
}