From a6c39a195273ad85c644dc734c57f79be1b8b688 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 14 Jan 2021 22:18:29 -0500 Subject: [PATCH] Starting on Rust service, implementing fast UDP I/O in Rust. --- rust-zerotier-core/Cargo.lock | 22 +- rust-zerotier-core/Cargo.toml | 2 +- rust-zerotier-core/src/inetaddress.rs | 18 ++ rust-zerotier-service/Cargo.lock | 176 ++++++++++++++- rust-zerotier-service/Cargo.toml | 10 +- .../src/fastudp/fastudpsocket.rs | 208 ++++++++++++++++++ rust-zerotier-service/src/fastudp/mod.rs | 1 + rust-zerotier-service/src/main.rs | 2 + 8 files changed, 425 insertions(+), 14 deletions(-) create mode 100644 rust-zerotier-service/src/fastudp/fastudpsocket.rs create mode 100644 rust-zerotier-service/src/fastudp/mod.rs diff --git a/rust-zerotier-core/Cargo.lock b/rust-zerotier-core/Cargo.lock index 52c15d043..9cbbcd45e 100644 --- a/rust-zerotier-core/Cargo.lock +++ b/rust-zerotier-core/Cargo.lock @@ -56,17 +56,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rust-zerotier-core" -version = "0.1.0" -dependencies = [ - "hex", - "num-derive", - "num-traits", - "serde", - "serde_json", -] - [[package]] name = "ryu" version = "1.0.5" @@ -120,3 +109,14 @@ name = "unicode-xid" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" + +[[package]] +name = "zerotier-core" +version = "0.1.0" +dependencies = [ + "hex", + "num-derive", + "num-traits", + "serde", + "serde_json", +] diff --git a/rust-zerotier-core/Cargo.toml b/rust-zerotier-core/Cargo.toml index d174bf869..b49ea457f 100644 --- a/rust-zerotier-core/Cargo.toml +++ b/rust-zerotier-core/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "rust-zerotier-core" +name = "zerotier-core" version = "0.1.0" authors = ["Adam Ierymenko "] edition = "2018" diff --git a/rust-zerotier-core/src/inetaddress.rs b/rust-zerotier-core/src/inetaddress.rs index f9d7c0736..7c7fb2e12 100644 --- a/rust-zerotier-core/src/inetaddress.rs +++ b/rust-zerotier-core/src/inetaddress.rs @@ -92,6 +92,15 @@ impl InetAddress { Some(a) } + /// Unsafely transmute a raw sockaddr_storage structure into an InetAddress. + /// The type S MUST have a size equal to the size of this type and the + /// OS's sockaddr_storage. If not, this may crash. + pub unsafe fn transmute_raw_sockaddr_storage(ss: &S) -> &InetAddress { + unsafe { + transmute(ss) + } + } + /// Transmute a ZT_InetAddress from the core into a reference to a Rust /// InetAddress containing exactly the same data. The returned reference /// of course only remains valid so long as the ZT_InetAddress remains @@ -184,6 +193,15 @@ impl From<&str> for InetAddress { } } +impl Clone for InetAddress { + #[inline(always)] + fn clone(&self) -> Self { + InetAddress{ + bits: self.bits + } + } +} + impl PartialEq for InetAddress { #[inline(always)] fn eq(&self, other: &Self) -> bool { diff --git a/rust-zerotier-service/Cargo.lock b/rust-zerotier-service/Cargo.lock index 9b68bfc0a..112b83895 100644 --- a/rust-zerotier-service/Cargo.lock +++ b/rust-zerotier-service/Cargo.lock @@ -1,5 +1,179 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. [[package]] -name = "rust-zerotier-service" +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "hermit-abi" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8" +dependencies = [ + "libc", +] + +[[package]] +name = "hex" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" + +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" + +[[package]] +name = "libc" +version = "0.2.82" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929" + +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "proc-macro2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "991431c3519a3f36861882da93630ce66b52918dcf1b8e2fd66b397fc96f28df" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + +[[package]] +name = "serde" +version = "1.0.119" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bdd36f49e35b61d49efd8aa7fc068fd295961fd2286d0b2ee9a4c7a14e99cc3" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.119" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552954ce79a059ddd5fd68c271592374bd15cab2274970380c000118aeffe1cd" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fceb2595057b6891a4ee808f70054bd2d12f0e97f1cbb78689b59f676df325a" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "1.0.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc60a3d73ea6594cd712d830cc1f0390fd71542d8c8cd24e70cc54cdfd5e05d5" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "unicode-xid" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "zerotier-core" version = "0.1.0" +dependencies = [ + "hex", + "num-derive", + "num-traits", + "serde", + "serde_json", +] + +[[package]] +name = "zerotier-service" +version = "0.1.0" +dependencies = [ + "libc", + "num_cpus", + "winapi", + "zerotier-core", +] diff --git a/rust-zerotier-service/Cargo.toml b/rust-zerotier-service/Cargo.toml index ba9b6b12f..e37e05711 100644 --- a/rust-zerotier-service/Cargo.toml +++ b/rust-zerotier-service/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "rust-zerotier-service" +name = "zerotier-service" version = "0.1.0" authors = ["Adam Ierymenko "] edition = "2018" @@ -7,3 +7,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +zerotier-core = { path = "../rust-zerotier-core" } +num_cpus = "1.13" + +[target."cfg(unix)".dependencies] +libc = "0.2.82" + +[target."cfg(windows)".dependencies] +winapi = { version = "0.3.9", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } diff --git a/rust-zerotier-service/src/fastudp/fastudpsocket.rs b/rust-zerotier-service/src/fastudp/fastudpsocket.rs new file mode 100644 index 000000000..6e66b23aa --- /dev/null +++ b/rust-zerotier-service/src/fastudp/fastudpsocket.rs @@ -0,0 +1,208 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use zerotier_core::{Buffer, InetAddress, InetAddressFamily}; +use std::ffi::CString; + +#[cfg(windows)] +pub type RawOsSocket = winapi::um::winsock2::SOCKET; + +#[cfg(windows)] +type AfInet = winapi::um::winsock2::AF_INET; + +#[cfg(windows)] +type AfInet6 = winapi::um::winsock2::AF_INET6; + +#[cfg(unix)] +pub type RawOsSocket = std::os::raw::c_int; + +#[cfg(unix)] +type AfInet = libc::AF_INET; + +#[cfg(unix)] +type AfInet6 = libc::AF_INET6; + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[cfg(target_os = "macos")] +unsafe fn bind_udp_socket(device_name: &CString, address: &InetAddress, af: libc::c_int) -> Option { + let s = libc::socket(af, libc::SOCK_DGRAM, 0); + if s < 0 { + return None; + } + + let mut fl: libc::c_int; + let fl_size = std::mem::size_of::() as libc::socklen_t; + let mut setsockopt_results: libc::c_int = 0; + + fl = 1; + setsockopt_results |= libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_REUSEPORT, &mut fl, fl_size); + fl = 1; + setsockopt_results |= libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_REUSEADDR, &mut fl, fl_size); + fl = 1; + setsockopt_results |= libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_BROADCAST, &mut fl, fl_size); + if setsockopt_results != 0 { + libc::close(s); + return None; + } + + fl = 1; + libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_NOSIGPIPE, &mut fl, fl_size); + + if af == libc::AF_INET { + fl = 1; + libc::setsockopt(s, libc::IPPROTO_IP, 0x4000 /* IP_DF */, &mut fl, fl_size); + } + if af == libc::AF_INET6 { + fl = 1; + libc::setsockopt(s, libc::IPPROTO_IPV6, 62 /* IPV6_DONTFRAG */, &mut fl, fl_size); + fl = 1; + libc::setsockopt(s, libc::IPPROTO_IPV6, libc::IPV6_V6ONLY, &mut fl, fl_size); + } + + fl = 1048576; + while fl >= 131072 { + if libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_RCVBUF, &mut fl, fl_size) == 0 { + break; + } + fl -= 65536; + } + fl = 1048576; + while fl >= 131072 { + if libc::setsockopt(s, libc::SOL_SOCKET, libc::SO_SNDBUF, &mut fl, fl_size) == 0 { + break; + } + fl -= 65536; + } + + let namidx = libc::if_nametoindex(device_name.as_ptr()) as libc::c_int; + if namidx != 0 { + libc::setsockopt(s, libc::IPPROTO_IP, 25 /* IP_BOUND_IF */, &namidx, fl_size); + } + + if libc::bind(s, (address as *const InetAddress).cast::(), std::mem::size_of::() as libc::socklen_t) != 0 { + libc::close(s); + return None; + } + + Some(s) +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub trait FastUDPSocketPacketHandler { + fn incoming_udp_packet(socket: &RawOsSocket, from_adddress: &InetAddress, mut data: Buffer); +} + +/// A multi-threaded (or otherwise fast) UDP socket that binds to both IPv4 and IPv6 addresses. +pub struct FastUDPSocket { + handler: Arc, + threads: Vec>, + thread_run: Arc, + sockets: Vec, + bind_address: InetAddress, +} + +#[cfg(unix)] +#[inline(always)] +pub fn fast_udp_socket_send_buffer(socket: &RawOsSocket, to_address: &InetAddress, data: &[u8], packet_ttl: i32) { + unsafe { + if packet_ttl <= 0 { + libc::sendto(*socket, data.as_ptr(), data.len() as libc::size_t, 0, (to_address as *const InetAddress).cast::(), std::mem::size_of::() as libc::socklen_t); + } else { + let mut ttl = packet_ttl as libc::c_int; + libc::setsockopt(*socket, libc::IPPROTO_IP, libc::IP_TTL, &mut ttl, std::mem::size_of::() as libc::socklen_t); + libc::sendto(*socket, data.as_ptr(), data.len() as libc::size_t, 0, (to_address as *const InetAddress).cast::(), std::mem::size_of::() as libc::socklen_t); + ttl = 255; + libc::setsockopt(*socket, libc::IPPROTO_IP, libc::IP_TTL, &mut ttl, std::mem::size_of::() as libc::socklen_t); + } + } +} + +#[cfg(windows)] +#[inline(always)] +pub fn fast_udp_socket_send_buffer(socket: &RawOsSocket, to_address: &InetAddress, data: &[u8], packet_ttl: i32) { +} + +impl FastUDPSocket { + pub fn new(device_name: &str, address: &InetAddress, handler: &Arc) -> Result, String> { + let thread_count = num_cpus::get(); + + let mut s = FastUDPSocket{ + handler: handler.clone(), + threads: Vec::new(), + thread_run: Arc::new(AtomicBool::new(true)), + sockets: Vec::new(), + bind_address: address.clone() + }; + + let device_name_c = CString::from(device_name); + let af = match address.family() { + InetAddressFamily::IPv4 => AfInet, + InetAddressFamily::IPv6 => AfInet6, + _ => { return Err(String::from("unrecognized address family")); } + }; + + for _ in 0..thread_count { + let thread_socket = unsafe { bind_udp_socket(&device_name_c, address, af) }; + if thread_socket.is_some() { + let thread_socket = thread_socket.unwrap(); + s.sockets.push(thread_socket); + + let thread_run = s.thread_run.clone(); + let handler_weak = Arc::downgrade(handler); + s.threads.push(std::thread::spawn(move || { + let mut from_address = InetAddress::new(); + while thread_run.load(Ordering::Relaxed) { + let mut buf = Buffer::new(); + let mut addrlen = std::mem::size_of::() as libc::socklen_t; + let read_length = unsafe { libc::recvfrom(thread_socket, buf.as_mut_ptr(), Buffer::CAPACITY as libc::size_t, 0, (&mut from_address as *mut InetAddress).cast::(), &mut addrlen) }; + if read_length > 0 { + let handler = handler_weak.upgrade(); + if handler.is_some() { + unsafe { buf.set_len(read_length as u32); } + handler.unwrap().incoming_udp_packet(&thread_socket, &from_address, buf); + } else { + break; + } + } else if read_length < 0 { + break; + } + } + })); + } + } + + if s.threads.is_empty() { + return Err(String::from("unable to bind to address for IPv4 or IPv6")); + } + + Ok(s) + } + + /// Get a socket suitable for sending. + #[inline(always)] + pub fn socket(&self) -> RawOsSocket { + return *self.sockets.get(0).unwrap(); + } +} + +impl Drop for FastUDPSocket { + fn drop(&mut self) { + let tmp: [u8; 1] = [0]; + self.thread_run.store(false, Ordering::Relaxed); + for s in self.sockets.iter() { + unsafe { + libc::sendto(*s as libc::c_int, tmp.as_ptr(), 0, 0, (&self.bind_address as *const InetAddress).cast::(), std::mem::size_of::() as libc::socklen_t); + } + } + for s in self.sockets.iter() { + unsafe { + libc::shutdown(*s as libc::c_int, libc::SHUT_RDWR); + libc::close(*s as libc::c_int); + } + } + for t in self.threads.iter() { + t.join() + } + } +} diff --git a/rust-zerotier-service/src/fastudp/mod.rs b/rust-zerotier-service/src/fastudp/mod.rs new file mode 100644 index 000000000..52bf8769a --- /dev/null +++ b/rust-zerotier-service/src/fastudp/mod.rs @@ -0,0 +1 @@ +pub mod fastudpsocket; diff --git a/rust-zerotier-service/src/main.rs b/rust-zerotier-service/src/main.rs index e7a11a969..d80542b8b 100644 --- a/rust-zerotier-service/src/main.rs +++ b/rust-zerotier-service/src/main.rs @@ -1,3 +1,5 @@ +mod fastudp; + fn main() { println!("Hello, world!"); }