Bunch more Rust work, and it builds again.

This commit is contained in:
Adam Ierymenko 2021-02-04 23:20:30 -05:00
parent f9deec7872
commit a321ee6c28
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
14 changed files with 176 additions and 128 deletions

View file

@ -25,7 +25,7 @@ central-controller-docker: FORCE
docker build -t registry.zerotier.com/zerotier-central/ztcentral-controller:${TIMESTAMP} -f controller/central-docker/Dockerfile .
clean: FORCE
rm -rf ${BUILDDIR}
rm -rf ${BUILDDIR} rust-zerotier-core/target rust-zerotier-service/target
distclean: FORCE
rm -rf ${BUILDDIR}

View file

@ -1519,25 +1519,13 @@ typedef struct
*/
ZT_InetAddress ia;
/* When compiling the ZeroTier core, we want to explicitly define these
* in the union. Otherwise we don't because that would require these
* structures to be included. */
#ifdef ZT_CORE
/**
* Socket address generic buffer
*/
struct sockaddr_storage ss;
/**
* Socket address header, for all ZT_ENDPOINT_TYPE_IP types
*/
struct sockaddr sa;
/**
* IPv4 address, for all ZT_ENDPOINT_TYPE_IP types if family is AF_INET
*/
struct sockaddr_in sa_in;
/**
* IPv6 address, for all ZT_ENDPOINT_TYPE_IP types if family is AF_INET6
*/
struct sockaddr_in6 sa_in6;
#endif

View file

@ -48,6 +48,9 @@ pub enum InetAddressFamily {
IPv6
}
pub const IPV4_INADDR_ANY: [u8; 4] = [0; 4];
pub const IPV6_INADDR_ANY: [u8; 16] = [0; 16];
/// Opaque structure that can hold an IPv4 or IPv6 address.
pub struct InetAddress {
// This must be the same size as ZT_InetAddress in zerotier.h. This is
@ -63,6 +66,24 @@ impl InetAddress {
}
}
/// Create an IPv4 0.0.0.0 InetAddress
pub fn new_ipv4_any(port: u16) -> InetAddress {
let mut ia = InetAddress::new();
unsafe {
ztcore::ZT_InetAddress_setIpBytes(ia.as_capi_mut_ptr(), IPV4_INADDR_ANY.as_ptr().cast(), 4, port as c_uint);
}
ia
}
/// Create an IPv6 ::0 InetAddress
pub fn new_ipv6_any(port: u16) -> InetAddress {
let mut ia = InetAddress::new();
unsafe {
ztcore::ZT_InetAddress_setIpBytes(ia.as_capi_mut_ptr(), IPV6_INADDR_ANY.as_ptr().cast(), 16, port as c_uint);
}
ia
}
/// Create from a 4-byte IPv4 IP or a 16-byte IPv6 IP.
/// Returns None if ip is not 4 or 16 bytes.
pub fn new_from_ip_bytes(ip: &[u8], port: u16) -> Option<InetAddress> {

View file

@ -15,7 +15,7 @@ use std::os::raw::{c_char, c_int};
use num_derive::{FromPrimitive, ToPrimitive};
#[allow(non_snake_case,non_upper_case_globals,non_camel_case_types,dead_code,improper_ctypes)]
mod capi;
mod capi; // bindgen generated
mod identity;
mod address;

View file

View file

@ -15,25 +15,29 @@ use std::collections::BTreeMap;
use std::net::IpAddr;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use futures::stream::{self, StreamExt};
use warp::Filter;
use warp::hyper::{HeaderMap, Method};
use warp::{Filter, Rejection, Reply};
use warp::http::{HeaderMap, Method, StatusCode};
use warp::hyper::body::Bytes;
use zerotier_core::*;
use crate::fastudpsocket::*;
use crate::localconfig::*;
use crate::network::Network;
use crate::getifaddrs;
use crate::localconfig::*;
use crate::log::Log;
use crate::network::Network;
struct ServiceEventHandler {}
struct Service {
local_config: Mutex<LocalConfig>,
run: AtomicBool,
}
impl NodeEventHandler<Network> for ServiceEventHandler {
impl NodeEventHandler<Network> for Service {
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc<Network>, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {
}
@ -56,7 +60,6 @@ impl NodeEventHandler<Network> for ServiceEventHandler {
0
}
#[inline(always)]
fn path_check(&self, address: Address, id: &Identity, local_socket: i64, sock_addr: &InetAddress) -> bool {
true
}
@ -66,69 +69,56 @@ impl NodeEventHandler<Network> for ServiceEventHandler {
}
}
impl Service {
#[inline(always)]
fn web_api_status(&self, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(warp::http::StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn web_api_network(&self, network_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(warp::http::StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn web_api_peer(&self, peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(warp::http::StatusCode::BAD_REQUEST)
}
}
pub(crate) fn run() -> i32 {
let inaddr_v6_any = IpAddr::from_str("::0").unwrap();
let mut process_exit_value: i32 = 0;
// Current active local configuration for this node.
let mut local_config: Box<LocalConfig> = Box::new(LocalConfig::default());
// Event handler for Node.
let handler: Arc<ServiceEventHandler> = Arc::new(ServiceEventHandler{});
// From this point on we are in Tokio async land...
let tokio_rt = tokio::runtime::Builder::new_multi_thread().thread_stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).build().unwrap();
let tokio_rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
tokio_rt.block_on(async {
// Keeps track of FastUDPSocket instances by bound address.
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
// Send something to interrupt_tx to interrupt the inner loop and force it to
// detect a change or exit if run has been set to false.
let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<u8>(2);
// Setting this to false terminates the service. It's atomic since this is multithreaded.
let run = AtomicBool::new(true);
let service: Arc<Service> = Arc::new(Service {
local_config: Mutex::new(LocalConfig::default()),
run: AtomicBool::new(true),
});
let mut primary_port_bind_failure = false;
loop {
let mut warp_server_port = local_config.settings.primary_port;
let root = warp::path::end().map(|| {
warp::reply::with_status("404", warp::hyper::StatusCode::NOT_FOUND)
});
let status = warp::path("status")
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.map(|method: Method, headers: HeaderMap, post_data: Bytes| {
"status"
});
let network = warp::path!("network" / String)
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.map(|nwid_str: String, method: Method, headers: HeaderMap, post_data: Bytes| {
"network"
});
let peer = warp::path!("peer" / String)
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.map(|peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| {
"peer"
});
let current_local_config_settings = service.local_config.lock().unwrap().settings.clone();
let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel();
let warp_server = warp::serve(warp::any().and(root
.or(status)
.or(network)
.or(peer)
)).try_bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async { let _ = shutdown_rx.await; });
let s0 = service.clone();
let s1 = service.clone();
let s2 = service.clone();
let warp_server = warp::serve(warp::any().and(warp::path::end().map(|| { warp::reply::with_status("404", warp::hyper::StatusCode::NOT_FOUND) })
.or(warp::path("status").and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |method: Method, headers: HeaderMap, post_data: Bytes| { s0.web_api_status(method, headers, post_data) }))
.or(warp::path!("network" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |network_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { s1.web_api_network(network_str, method, headers, post_data) }))
.or(warp::path!("peer" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { s2.web_api_peer(peer_str, method, headers, post_data) }))
)).try_bind_with_graceful_shutdown((IpAddr::from([127_u8, 0_u8, 0_u8, 1_u8]), current_local_config_settings.primary_port), async { let _ = shutdown_rx.await; });
if warp_server.is_err() {
// TODO: log unable to bind to primary port
run.store(false, Ordering::Relaxed);
primary_port_bind_failure = true;
break;
}
let warp_server = tokio_rt.spawn(warp_server.unwrap().1);
let mut loop_delay = 10;
loop {
@ -137,29 +127,31 @@ pub(crate) fn run() -> i32 {
_ = interrupt_rx.next() => {},
_ = tokio::signal::ctrl_c() => {
// TODO: log CTRL+C received
run.store(false, Ordering::Relaxed);
service.run.store(false, Ordering::Relaxed);
let _ = shutdown_tx.send(());
break;
}
}
// Enumerate physical addresses on the system, creating a map with an entry for
// the primary_port and another for the secondary_port if bound.
let mut system_addrs: BTreeMap<InetAddress, String> = BTreeMap::new();
getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| {
if !local_config.settings.is_interface_blacklisted(dev) {
let mut a = addr.clone();
a.set_port(local_config.settings.primary_port);
system_addrs.insert(a, String::from(dev));
if local_config.settings.secondary_port.is_some() {
let mut a = addr.clone();
a.set_port(local_config.settings.secondary_port.unwrap());
system_addrs.insert(a, String::from(dev));
}
match addr.ip_scope() {
IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => {
if !current_local_config_settings.is_interface_blacklisted(dev) {
let mut a = addr.clone();
a.set_port(current_local_config_settings.primary_port);
system_addrs.insert(a, String::from(dev));
if current_local_config_settings.secondary_port.is_some() {
let mut a = addr.clone();
a.set_port(current_local_config_settings.secondary_port.unwrap());
system_addrs.insert(a, String::from(dev));
}
}
},
_ => {}
}
});
// Close UDP bindings that no longer apply.
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
@ -170,7 +162,6 @@ pub(crate) fn run() -> i32 {
udp_sockets.remove(k);
}
// Bind addresses that are not already bound.
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let s = FastUDPSocket::new(addr.1.as_str(), addr.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
@ -182,23 +173,39 @@ pub(crate) fn run() -> i32 {
}
}
// TODO: check that ports are bound, implement port hunting or exit.
primary_port_bind_failure = true;
for s in udp_sockets.iter() {
if s.0.port() == current_local_config_settings.primary_port {
primary_port_bind_failure = false;
break;
}
}
if primary_port_bind_failure {
break;
}
if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) {
if !service.run.load(Ordering::Relaxed) || current_local_config_settings.primary_port != service.local_config.lock().unwrap().settings.primary_port {
let _ = shutdown_tx.send(());
break;
}
}
let _ = warp_server.await;
let _ = warp_server.unwrap().1.await;
if !run.load(Ordering::Relaxed) {
if !service.run.load(Ordering::Relaxed) {
break;
}
tokio::time::sleep(Duration::from_millis(250)).await;
if !run.load(Ordering::Relaxed) {
if !service.run.load(Ordering::Relaxed) {
break;
}
if primary_port_bind_failure {
let local_config = service.local_config.lock().unwrap();
if local_config.settings.auto_port_search {
// TODO: port hunting if enabled
}
}
}
});

View file

@ -28,10 +28,10 @@ use crate::osdep as osdep;
use winapi::um::winsock2 as winsock2;
#[cfg(windows)]
pub type FastUDPRawOsSocket = winsock2::SOCKET;
pub(crate) type FastUDPRawOsSocket = winsock2::SOCKET;
#[cfg(unix)]
pub type FastUDPRawOsSocket = c_int;
pub(crate) type FastUDPRawOsSocket = c_int;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// bind_udp_socket() implementations for each platform
@ -122,20 +122,51 @@ fn bind_udp_socket(_: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket,
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// Test bind UDP to a port at 0.0.0.0 and ::0, returning whether IPv4 and/or IPv6 succeeded (respectively).
pub(crate) fn test_bind_udp(port: u16) -> (bool, bool) {
let v4 = InetAddress::new_ipv4_any(port);
let v6 = InetAddress::new_ipv6_any(port);
let v4b = bind_udp_socket("", &v4);
if v4b.is_ok() {
fast_udp_socket_close(v4b.as_ref().unwrap());
}
let v6b = bind_udp_socket("", &v6);
if v6b.is_ok() {
fast_udp_socket_close(v6b.as_ref().unwrap());
}
(v4b.is_ok(), v6b.is_ok())
}
/// A multi-threaded (or otherwise fast) UDP socket that binds to both IPv4 and IPv6 addresses.
pub struct FastUDPSocket {
pub(crate) struct FastUDPSocket {
threads: Vec<std::thread::JoinHandle<()>>,
thread_run: Arc<AtomicBool>,
sockets: Vec<FastUDPRawOsSocket>,
pub bind_address: InetAddress,
}
#[cfg(unix)]
#[inline(always)]
fn fast_udp_socket_close(socket: &FastUDPRawOsSocket) {
unsafe {
osdep::close(*socket);
}
}
#[cfg(windows)]
#[inline(always)]
fn fast_udp_socket_close(socket: &FastUDPRawOsSocket) {
unsafe {
osdep::close(*socket);
}
}
/// Send to a raw UDP socket with optional packet TTL.
/// If the packet_ttl option is <=0, packet is sent with the default TTL. TTL setting is only used
/// in ZeroTier right now to do escalating TTL probes for IPv4 NAT traversal.
#[cfg(unix)]
#[inline(always)]
pub fn fast_udp_socket_sendto(socket: &FastUDPRawOsSocket, to_address: &InetAddress, data: *const u8, len: usize, packet_ttl: i32) {
pub(crate) fn fast_udp_socket_sendto(socket: &FastUDPRawOsSocket, to_address: &InetAddress, data: *const u8, len: usize, packet_ttl: i32) {
unsafe {
if packet_ttl <= 0 {
osdep::sendto(*socket, data.cast(), len.as_(), 0, (to_address as *const InetAddress).cast(), std::mem::size_of::<InetAddress>().as_());
@ -151,7 +182,7 @@ pub fn fast_udp_socket_sendto(socket: &FastUDPRawOsSocket, to_address: &InetAddr
#[cfg(windows)]
#[inline(always)]
pub fn fast_udp_socket_sendto(socket: &FastUDPRawOsSocket, to_address: &InetAddress, data: &[u8], packet_ttl: i32) {
pub(crate) fn fast_udp_socket_sendto(socket: &FastUDPRawOsSocket, to_address: &InetAddress, data: &[u8], packet_ttl: i32) {
}
#[cfg(unix)]

View file

@ -100,6 +100,8 @@ pub struct LocalConfigSettings {
pub auto_port_search: bool,
#[serde(rename = "portMapping")]
pub port_mapping: bool,
#[serde(rename = "logPath")]
pub log_path: Option<String>,
#[serde(rename = "logSizeMax")]
pub log_size_max: usize,
#[serde(rename = "logVL1Events")]
@ -189,6 +191,7 @@ impl Default for LocalConfigSettings {
secondary_port: Some(zerotier_core::DEFAULT_SECONDARY_PORT),
auto_port_search: true,
port_mapping: true,
log_path: None,
log_size_max: 1048576,
log_vl1_events: false,
log_vl2_events: false,

View file

@ -11,16 +11,18 @@
*/
/****/
use std::fs::{File, OpenOptions};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::io::{Write, Seek, SeekFrom};
use std::cell::Cell;
use zerotier_core::PortableAtomicI64;
use chrono::Datelike;
use std::fmt::Display;
use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
pub struct Log {
use chrono::Datelike;
use zerotier_core::PortableAtomicI64;
pub(crate) struct Log {
prefix: String,
path: String,
file: Mutex<Cell<Option<File>>>,
@ -29,7 +31,7 @@ pub struct Log {
}
impl Log {
const MIN_MAX_SIZE: usize = 4096;
const MIN_MAX_SIZE: usize = 1024;
pub fn new(path: &str, max_size: usize, prefix: &str) -> Log {
let mut p = String::from(prefix);

View file

@ -11,6 +11,7 @@
*/
/****/
mod api;
mod cli;
mod commands;
mod fastudpsocket;
@ -22,7 +23,7 @@ mod network;
mod vnic;
#[allow(non_snake_case,non_upper_case_globals,non_camel_case_types,dead_code,improper_ctypes)]
mod osdep;
mod osdep; // bindgen generated
use std::boxed::Box;
use std::ffi::CStr;
@ -30,19 +31,14 @@ use std::path::Path;
fn main() {
let mut process_exit_value: i32 = 0;
let mut zerotier_path;
unsafe {
zerotier_path = zerotier_core::cstr_to_string(osdep::platformDefaultHomePath(), 256);
}
let mut zerotier_path = unsafe { zerotier_core::cstr_to_string(osdep::platformDefaultHomePath(), 256) };
let mut cli_args = Some(Box::new(cli::parse_cli_args()));
let json_output;
let mut token: Option<String> = None;
let mut token_path = Path::new(&zerotier_path).join("authtoken.secret");
{
let a = cli_args.unwrap();
let a = cli_args.as_ref().unwrap();
json_output = a.is_present("json");
let v = a.value_of("path");
if v.is_some() {
@ -54,11 +50,11 @@ fn main() {
}
let v = a.value_of("token_path");
if v.is_some() {
token_path = Path::new(v.unwrap().trim()).into_path_buf();
token_path = Path::new(v.unwrap().trim()).to_path_buf();
}
}
match cli_args.unwrap().subcommand_name().unwrap() {
match cli_args.as_ref().unwrap().subcommand_name().unwrap() {
"version" => {
let ver = zerotier_core::version();
println!("{}.{}.{}", ver.0, ver.1, ver.2);

View file

@ -16,7 +16,7 @@ use std::path::{Path, PathBuf};
use zerotier_core::StateObjectType;
use std::io::{Read, Write};
pub struct Store {
pub(crate) struct Store {
pub base_path: Box<Path>,
pub peers_path: Box<Path>,
pub controller_path: Box<Path>,

View file

@ -54,6 +54,7 @@ use zerotier_core::{InetAddress, MAC, MulticastGroup, NetworkId};
use crate::osdep as osdep;
use crate::getifaddrs;
use crate::vnic::VNIC;
use crate::osdep::getifmaddrs;
const BPF_BUFFER_SIZE: usize = 131072;
const IFCONFIG: &str = "/sbin/ifconfig";
@ -79,7 +80,7 @@ impl Drop for MacFethDevice {
}
}
pub struct MacFethTap {
pub(crate) struct MacFethTap {
network_id: u64,
device: MacFethDevice,
ndrv_fd: c_int,
@ -106,7 +107,6 @@ fn device_ipv6_set_params(device: &String, perform_nud: bool, accept_ra: bool) -
let dev = device.as_bytes();
let mut ok = true;
unsafe {
let s = osdep::socket(osdep::AF_INET6 as c_int, osdep::SOCK_DGRAM as c_int, 0);
if s < 0 {
return false;
@ -148,7 +148,7 @@ impl MacFethTap {
/// given will not remain valid after it returns. Also note that F will be called
/// from another thread that is spawned here, so all its bound references must
/// be "Send" and "Sync" e.g. Arc<>.
pub fn new<F: Fn(&[u8]) + Send + Sync + 'static>(nwid: &NetworkId, mac: &MAC, mtu: i32, metric: i32, eth_frame_func: F) -> Result<MacFethTap, String> {
pub(crate) fn new<F: Fn(&[u8]) + Send + Sync + 'static>(nwid: &NetworkId, mac: &MAC, mtu: i32, metric: i32, eth_frame_func: F) -> Result<MacFethTap, String> {
// This tracks BPF devices we are using so we don't try to reopen them, and also
// doubles as a global lock to ensure that only one feth tap is created at once per
// ZeroTier process per system.
@ -357,8 +357,8 @@ impl MacFethTap {
fn have_ip(&self, ip: &InetAddress) -> bool {
let mut have_ip = false;
PhysicalLink::map(|link: PhysicalLink| {
if link.device.eq(&self.device.name) && link.address.eq(ip) {
getifaddrs::for_each_address(|addr: &InetAddress, device_name: &str| {
if device_name.eq(&self.device.name) && addr.eq(ip) {
have_ip = true;
}
});
@ -393,9 +393,9 @@ impl VNIC for MacFethTap {
let mut ipv: Vec<InetAddress> = Vec::new();
ipv.reserve(8);
let dev = self.device.name.as_str();
PhysicalLink::map(|link: PhysicalLink| {
if link.device.eq(dev) {
ipv.push(link.address.clone());
getifaddrs::for_each_address(|addr: &InetAddress, device_name: &str| {
if device_name.eq(dev) {
ipv.push(addr.clone());
}
});
ipv.sort();

View file

@ -17,4 +17,4 @@ mod common;
#[cfg(target_os = "macos")]
mod mac_feth_tap;
pub use vnic::VNIC;
pub(crate) use vnic::VNIC;

View file

@ -11,7 +11,7 @@
*/
/****/
pub trait VNIC {
pub(crate) trait VNIC {
fn add_ip(&self, ip: &zerotier_core::InetAddress) -> bool;
fn remove_ip(&self, ip: &zerotier_core::InetAddress) -> bool;
fn ips(&self) -> Vec<zerotier_core::InetAddress>;