Starting on Rust service, implementing fast UDP I/O in Rust.

This commit is contained in:
Adam Ierymenko 2021-01-14 22:18:29 -05:00
parent 98d6e05b2d
commit a6c39a1952
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
8 changed files with 425 additions and 14 deletions

View file

@ -56,17 +56,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rust-zerotier-core"
version = "0.1.0"
dependencies = [
"hex",
"num-derive",
"num-traits",
"serde",
"serde_json",
]
[[package]]
name = "ryu"
version = "1.0.5"
@ -120,3 +109,14 @@ name = "unicode-xid"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "zerotier-core"
version = "0.1.0"
dependencies = [
"hex",
"num-derive",
"num-traits",
"serde",
"serde_json",
]

View file

@ -1,5 +1,5 @@
[package]
name = "rust-zerotier-core"
name = "zerotier-core"
version = "0.1.0"
authors = ["Adam Ierymenko <adam.ierymenko@zerotier.com>"]
edition = "2018"

View file

@ -92,6 +92,15 @@ impl InetAddress {
Some(a)
}
/// Unsafely transmute a raw sockaddr_storage structure into an InetAddress.
/// The type S MUST have a size equal to the size of this type and the
/// OS's sockaddr_storage. If not, this may crash.
pub unsafe fn transmute_raw_sockaddr_storage<S>(ss: &S) -> &InetAddress {
unsafe {
transmute(ss)
}
}
/// Transmute a ZT_InetAddress from the core into a reference to a Rust
/// InetAddress containing exactly the same data. The returned reference
/// of course only remains valid so long as the ZT_InetAddress remains
@ -184,6 +193,15 @@ impl From<&str> for InetAddress {
}
}
impl Clone for InetAddress {
#[inline(always)]
fn clone(&self) -> Self {
InetAddress{
bits: self.bits
}
}
}
impl PartialEq for InetAddress {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {

View file

@ -1,5 +1,179 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "rust-zerotier-service"
name = "autocfg"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "hermit-abi"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8"
dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "libc"
version = "0.2.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929"
[[package]]
name = "num-derive"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "proc-macro2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
dependencies = [
"unicode-xid",
]
[[package]]
name = "quote"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "991431c3519a3f36861882da93630ce66b52918dcf1b8e2fd66b397fc96f28df"
dependencies = [
"proc-macro2",
]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "serde"
version = "1.0.119"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bdd36f49e35b61d49efd8aa7fc068fd295961fd2286d0b2ee9a4c7a14e99cc3"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.119"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552954ce79a059ddd5fd68c271592374bd15cab2274970380c000118aeffe1cd"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fceb2595057b6891a4ee808f70054bd2d12f0e97f1cbb78689b59f676df325a"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "syn"
version = "1.0.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc60a3d73ea6594cd712d830cc1f0390fd71542d8c8cd24e70cc54cdfd5e05d5"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "unicode-xid"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "zerotier-core"
version = "0.1.0"
dependencies = [
"hex",
"num-derive",
"num-traits",
"serde",
"serde_json",
]
[[package]]
name = "zerotier-service"
version = "0.1.0"
dependencies = [
"libc",
"num_cpus",
"winapi",
"zerotier-core",
]

View file

@ -1,5 +1,5 @@
[package]
name = "rust-zerotier-service"
name = "zerotier-service"
version = "0.1.0"
authors = ["Adam Ierymenko <adam.ierymenko@zerotier.com>"]
edition = "2018"
@ -7,3 +7,11 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
zerotier-core = { path = "../rust-zerotier-core" }
num_cpus = "1.13"
[target."cfg(unix)".dependencies]
libc = "0.2.82"
[target."cfg(windows)".dependencies]
winapi = { version = "0.3.9", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -0,0 +1,208 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use zerotier_core::{Buffer, InetAddress, InetAddressFamily};
use std::ffi::CString;
#[cfg(windows)]
pub type RawOsSocket = winapi::um::winsock2::SOCKET;
#[cfg(windows)]
type AfInet = winapi::um::winsock2::AF_INET;
#[cfg(windows)]
type AfInet6 = winapi::um::winsock2::AF_INET6;
#[cfg(unix)]
pub type RawOsSocket = std::os::raw::c_int;
#[cfg(unix)]
type AfInet = libc::AF_INET;
#[cfg(unix)]
type AfInet6 = libc::AF_INET6;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg(target_os = "macos")]
unsafe fn bind_udp_socket(device_name: &CString, address: &InetAddress, af: libc::c_int) -> Option<RawOsSocket> {
let s = libc::socket(af, libc::SOCK_DGRAM, 0);
if s < 0 {
return None;
}
let mut fl: libc::c_int;
let fl_size = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
let mut setsockopt_results: libc::c_int = 0;
fl = 1;
setsockopt_results |= libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_REUSEPORT, &mut fl, fl_size);
fl = 1;
setsockopt_results |= libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_REUSEADDR, &mut fl, fl_size);
fl = 1;
setsockopt_results |= libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_BROADCAST, &mut fl, fl_size);
if setsockopt_results != 0 {
libc::close(s);
return None;
}
fl = 1;
libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_NOSIGPIPE, &mut fl, fl_size);
if af == libc::AF_INET {
fl = 1;
libc::setsockopt(s, libc::IPPROTO_IP, 0x4000 /* IP_DF */, &mut fl, fl_size);
}
if af == libc::AF_INET6 {
fl = 1;
libc::setsockopt(s, libc::IPPROTO_IPV6, 62 /* IPV6_DONTFRAG */, &mut fl, fl_size);
fl = 1;
libc::setsockopt(s, libc::IPPROTO_IPV6, libc::IPV6_V6ONLY, &mut fl, fl_size);
}
fl = 1048576;
while fl >= 131072 {
if libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_RCVBUF, &mut fl, fl_size) == 0 {
break;
}
fl -= 65536;
}
fl = 1048576;
while fl >= 131072 {
if libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_SNDBUF, &mut fl, fl_size) == 0 {
break;
}
fl -= 65536;
}
let namidx = libc::if_nametoindex(device_name.as_ptr()) as libc::c_int;
if namidx != 0 {
libc::setsockopt(s, libc::IPPROTO_IP, 25 /* IP_BOUND_IF */, &namidx, fl_size);
}
if libc::bind(s, (address as *const InetAddress).cast::<libc::sockaddr>(), std::mem::size_of::<libc::sockaddr_in6>() as libc::socklen_t) != 0 {
libc::close(s);
return None;
}
Some(s)
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub trait FastUDPSocketPacketHandler {
fn incoming_udp_packet(socket: &RawOsSocket, from_adddress: &InetAddress, mut data: Buffer);
}
/// A multi-threaded (or otherwise fast) UDP socket that binds to both IPv4 and IPv6 addresses.
pub struct FastUDPSocket<H: FastUDPSocketPacketHandler + 'static> {
handler: Arc<H>,
threads: Vec<std::thread::JoinHandle<()>>,
thread_run: Arc<AtomicBool>,
sockets: Vec<RawOsSocket>,
bind_address: InetAddress,
}
#[cfg(unix)]
#[inline(always)]
pub fn fast_udp_socket_send_buffer(socket: &RawOsSocket, to_address: &InetAddress, data: &[u8], packet_ttl: i32) {
unsafe {
if packet_ttl <= 0 {
libc::sendto(*socket, data.as_ptr(), data.len() as libc::size_t, 0, (to_address as *const InetAddress).cast::<libc::sockaddr>(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
} else {
let mut ttl = packet_ttl as libc::c_int;
libc::setsockopt(*socket, libc::IPPROTO_IP, libc::IP_TTL, &mut ttl, std::mem::size_of::<libc::c_int>() as libc::socklen_t);
libc::sendto(*socket, data.as_ptr(), data.len() as libc::size_t, 0, (to_address as *const InetAddress).cast::<libc::sockaddr>(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
ttl = 255;
libc::setsockopt(*socket, libc::IPPROTO_IP, libc::IP_TTL, &mut ttl, std::mem::size_of::<libc::c_int>() as libc::socklen_t);
}
}
}
#[cfg(windows)]
#[inline(always)]
pub fn fast_udp_socket_send_buffer(socket: &RawOsSocket, to_address: &InetAddress, data: &[u8], packet_ttl: i32) {
}
impl<H: FastUDPSocketPacketHandler + 'static> FastUDPSocket<H> {
pub fn new(device_name: &str, address: &InetAddress, handler: &Arc<H>) -> Result<FastUDPSocket<H>, String> {
let thread_count = num_cpus::get();
let mut s = FastUDPSocket{
handler: handler.clone(),
threads: Vec::new(),
thread_run: Arc::new(AtomicBool::new(true)),
sockets: Vec::new(),
bind_address: address.clone()
};
let device_name_c = CString::from(device_name);
let af = match address.family() {
InetAddressFamily::IPv4 => AfInet,
InetAddressFamily::IPv6 => AfInet6,
_ => { return Err(String::from("unrecognized address family")); }
};
for _ in 0..thread_count {
let thread_socket = unsafe { bind_udp_socket(&device_name_c, address, af) };
if thread_socket.is_some() {
let thread_socket = thread_socket.unwrap();
s.sockets.push(thread_socket);
let thread_run = s.thread_run.clone();
let handler_weak = Arc::downgrade(handler);
s.threads.push(std::thread::spawn(move || {
let mut from_address = InetAddress::new();
while thread_run.load(Ordering::Relaxed) {
let mut buf = Buffer::new();
let mut addrlen = std::mem::size_of::<InetAddress>() as libc::socklen_t;
let read_length = unsafe { libc::recvfrom(thread_socket, buf.as_mut_ptr(), Buffer::CAPACITY as libc::size_t, 0, (&mut from_address as *mut InetAddress).cast::<libc::sockaddr>(), &mut addrlen) };
if read_length > 0 {
let handler = handler_weak.upgrade();
if handler.is_some() {
unsafe { buf.set_len(read_length as u32); }
handler.unwrap().incoming_udp_packet(&thread_socket, &from_address, buf);
} else {
break;
}
} else if read_length < 0 {
break;
}
}
}));
}
}
if s.threads.is_empty() {
return Err(String::from("unable to bind to address for IPv4 or IPv6"));
}
Ok(s)
}
/// Get a socket suitable for sending.
#[inline(always)]
pub fn socket(&self) -> RawOsSocket {
return *self.sockets.get(0).unwrap();
}
}
impl<H: FastUDPSocketPacketHandler + 'static> Drop for FastUDPSocket<H> {
fn drop(&mut self) {
let tmp: [u8; 1] = [0];
self.thread_run.store(false, Ordering::Relaxed);
for s in self.sockets.iter() {
unsafe {
libc::sendto(*s as libc::c_int, tmp.as_ptr(), 0, 0, (&self.bind_address as *const InetAddress).cast::<libc::sockaddr>(), std::mem::size_of::<InetAddress>() as libc::socklen_t);
}
}
for s in self.sockets.iter() {
unsafe {
libc::shutdown(*s as libc::c_int, libc::SHUT_RDWR);
libc::close(*s as libc::c_int);
}
}
for t in self.threads.iter() {
t.join()
}
}
}

View file

@ -0,0 +1 @@
pub mod fastudpsocket;

View file

@ -1,3 +1,5 @@
mod fastudp;
fn main() {
println!("Hello, world!");
}