diff --git a/rustybits/Cargo.lock b/rustybits/Cargo.lock index fd6b37a34..a4da1e15e 100644 --- a/rustybits/Cargo.lock +++ b/rustybits/Cargo.lock @@ -103,39 +103,6 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-trait" version = "0.1.88" @@ -180,7 +147,7 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 1.0.2", - "tower 0.5.2", + "tower", "tower-layer", "tower-service", ] @@ -431,15 +398,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -481,15 +439,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc32fast" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" -dependencies = [ - "cfg-if", -] - [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -874,12 +823,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "fastrand" version = "2.3.0" @@ -920,16 +863,6 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" -[[package]] -name = "flate2" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1078,85 +1011,6 @@ dependencies = [ "slab", ] -[[package]] -name = "gcloud-auth" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4089aeec499899f6f1309803279763c34d898ca86c1a3e4616cba3ca6bce34b7" -dependencies = [ - "async-trait", - "base64 0.22.1", - "gcloud-metadata", - "home", - "jsonwebtoken", - "reqwest 0.12.22", - "serde", - "serde_json", - "thiserror 1.0.69", - "time", - "token-source", - "tokio", - "tracing", - "urlencoding", -] - -[[package]] -name = "gcloud-gax" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0a39057a0654184e074ddefb734c9a326ba468663b8f329c95f0069b5ccdb5" -dependencies = [ - "http 1.3.1", - "thiserror 1.0.69", - "token-source", - "tokio", - "tokio-retry2", - "tonic", - "tower 0.4.13", - "tracing", -] - -[[package]] -name = "gcloud-googleapis" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501459a508e7887cfedc45a45ee41602ac1f66d2b61deb05f1c2256bf2faf46d" -dependencies = [ - "prost 0.13.5", - "prost-types 0.13.5", - "tonic", -] - -[[package]] -name = "gcloud-metadata" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d575310b4546530f6b21ee000c20155f11f9291fa0b67ea0949fd48aa49ed70" -dependencies = [ - "reqwest 0.12.22", - "thiserror 1.0.69", - "tokio", -] - -[[package]] -name = "gcloud-pubsub" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1d55e2652753753d902a84c56613b9ae35a36416d94e56c1268764bc7e79f05" -dependencies = [ - "async-channel", - "async-stream", - "gcloud-auth", - "gcloud-gax", - "gcloud-googleapis", - "prost-types 0.13.5", - "thiserror 1.0.69", - "token-source", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -1505,29 +1359,12 @@ dependencies = [ "tokio-native-tls", ] -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper 1.6.0", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f66d5bd4c6f02bf0542fad85d626775bab9258cf795a4256dcaf3161114d1df" dependencies = [ - "base64 0.22.1", "bytes", "futures-channel", "futures-core", @@ -1535,9 +1372,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "hyper 1.6.0", - "ipnet", "libc", - "percent-encoding", "pin-project-lite", "socket2", "tokio", @@ -1754,16 +1589,6 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" -[[package]] -name = "iri-string" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1804,21 +1629,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "jsonwebtoken" -version = "9.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" -dependencies = [ - "base64 0.22.1", - "js-sys", - "pem", - "ring", - "serde", - "serde_json", - "simple_asn1", -] - [[package]] name = "jwt" version = "0.16.0" @@ -2029,16 +1839,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num-bigint" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" -dependencies = [ - "num-integer", - "num-traits", -] - [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -2103,7 +1903,7 @@ dependencies = [ "getrandom 0.2.16", "http 0.2.12", "rand 0.8.5", - "reqwest 0.11.27", + "reqwest", "serde", "serde_json", "serde_path_to_error", @@ -2310,16 +2110,6 @@ dependencies = [ "syn", ] -[[package]] -name = "pem" -version = "3.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" -dependencies = [ - "base64 0.22.1", - "serde", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -2851,7 +2641,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.32", - "hyper-tls 0.5.0", + "hyper-tls", "ipnet", "js-sys", "log", @@ -2876,44 +2666,6 @@ dependencies = [ "winreg", ] -[[package]] -name = "reqwest" -version = "0.12.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" -dependencies = [ - "base64 0.22.1", - "bytes", - "encoding_rs", - "futures-core", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-tls 0.6.0", - "hyper-util", - "js-sys", - "log", - "mime", - "native-tls", - "percent-encoding", - "pin-project-lite", - "rustls-pki-types", - "serde", - "serde_json", - "serde_urlencoded", - "sync_wrapper 1.0.2", - "tokio", - "tokio-native-tls", - "tower 0.5.2", - "tower-http", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "rfc6979" version = "0.4.0" @@ -3101,15 +2853,10 @@ dependencies = [ "base64 0.21.7", "bytes", "cbindgen", - "gcloud-gax", - "gcloud-googleapis", - "gcloud-pubsub", "jwt", "openidconnect", - "prost 0.14.1", "prost-build 0.14.1", - "prost-types 0.14.1", - "reqwest 0.11.27", + "reqwest", "serde", "serde_json", "temporal-client", @@ -3396,18 +3143,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "simple_asn1" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb" -dependencies = [ - "num-bigint", - "num-traits", - "thiserror 2.0.12", - "time", -] - [[package]] name = "siphasher" version = "1.0.1" @@ -3533,9 +3268,6 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" -dependencies = [ - "futures-core", -] [[package]] name = "synstructure" @@ -3621,7 +3353,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "tonic", - "tower 0.5.2", + "tower", "tracing", "url", "uuid", @@ -3872,15 +3604,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "token-source" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75746ae15bef509f21039a652383104424208fdae172a964a8930858b9a78412" -dependencies = [ - "async-trait", -] - [[package]] name = "tokio" version = "1.46.1" @@ -3922,16 +3645,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-retry2" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1264d076dd34560544a2799e40e457bd07c43d30f4a845686b031bcd8455c84f" -dependencies = [ - "pin-project", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.2" @@ -4032,7 +3745,6 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", - "flate2", "h2 0.4.11", "http 1.3.1", "http-body 1.0.1", @@ -4048,11 +3760,10 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", - "tower 0.5.2", + "tower", "tower-layer", "tower-service", "tracing", - "webpki-roots 0.26.11", ] [[package]] @@ -4069,21 +3780,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "pin-project", - "pin-project-lite", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tower" version = "0.5.2" @@ -4103,24 +3799,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" -dependencies = [ - "bitflags 2.9.1", - "bytes", - "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "iri-string", - "pin-project-lite", - "tower 0.5.2", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-layer" version = "0.3.3" @@ -4139,7 +3817,6 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4255,12 +3932,6 @@ dependencies = [ "serde", ] -[[package]] -name = "urlencoding" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" - [[package]] name = "utf8_iter" version = "1.0.4" @@ -4417,24 +4088,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-roots" -version = "0.26.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" -dependencies = [ - "webpki-roots 1.0.2", -] - -[[package]] -name = "webpki-roots" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/rustybits/Cargo.toml b/rustybits/Cargo.toml index 6c1cfeb3b..9f718d191 100644 --- a/rustybits/Cargo.toml +++ b/rustybits/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" crate-type = ["staticlib", "rlib"] [features] -default = ["zeroidc", "ztcontroller"] +default = ["zeroidc"] zeroidc = [] ztcontroller = [ "dep:serde", @@ -15,11 +15,6 @@ ztcontroller = [ "dep:temporal-sdk", "dep:temporal-client", "dep:temporal-sdk-core-protos", - "dep:gcloud-pubsub", - "dep:prost", - "dep:prost-types", - "dep:gcloud-gax", - "dep:gcloud-googleapis", "dep:tokio", "dep:tokio-util", ] @@ -51,11 +46,6 @@ jwt = { version = "0.16", git = "https://github.com/glimberg/rust-jwt" } time = { version = "~0.3", features = ["formatting"] } bytes = "1.3" thiserror = "1" -gcloud-pubsub = { version = "1.3.0", optional = true } -prost = { version = "0.14", optional = true, features = ["derive"] } -prost-types = { version = "0.14", optional = true } -gcloud-gax = { version = "1.2.0", optional = true } -gcloud-googleapis = { version = "1.2.0", optional = true } [dev-dependencies] testcontainers = { version = "0.24", features = ["blocking"] } diff --git a/rustybits/build.rs b/rustybits/build.rs index e5d61181e..d756ef3f3 100644 --- a/rustybits/build.rs +++ b/rustybits/build.rs @@ -4,23 +4,6 @@ use cbindgen::{Config, Language, MacroExpansionConfig}; use std::env; use std::path::PathBuf; fn main() { - #[cfg(feature = "ztcontroller")] - { - let mut prost_build = prost_build::Config::new(); - - prost_build - .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") - .compile_protos( - &[ - "src/pubsub/network.proto", - "src/pubsub/member.proto", - "src/pubsub/member_status.proto", - ], - &["src/pubsub/"], - ) - .expect("Failed to compile protobuf files"); - } - let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); let package_name = env::var("CARGO_PKG_NAME").unwrap(); diff --git a/rustybits/src/ext.rs b/rustybits/src/ext.rs index 50962d785..bf6dcea91 100644 --- a/rustybits/src/ext.rs +++ b/rustybits/src/ext.rs @@ -13,12 +13,6 @@ use std::ffi::{CStr, CString}; use std::os::raw::c_char; #[cfg(feature = "ztcontroller")] -use std::os::raw::c_void; -#[cfg(feature = "ztcontroller")] -use std::sync::Arc; -#[cfg(feature = "ztcontroller")] -use std::time::Duration; -#[cfg(feature = "ztcontroller")] use tokio::runtime; use url::Url; @@ -34,7 +28,7 @@ static SHUTDOWN: std::sync::Once = std::sync::Once::new(); #[no_mangle] pub unsafe extern "C" fn init_async_runtime() { START.call_once(|| { - let rt = tokio::runtime::Builder::new_multi_thread() + let rt = runtime::Builder::new_multi_thread() .worker_threads(4) .thread_name("rust-async-worker") .enable_all() @@ -499,184 +493,3 @@ pub unsafe extern "C" fn smee_client_notify_network_joined( } } } - -#[cfg(feature = "ztcontroller")] -use crate::pubsub::member_listener::MemberListener; -#[cfg(feature = "ztcontroller")] -use crate::pubsub::network_listener::NetworkListener; - -#[cfg(feature = "ztcontroller")] -use crate::pubsub::member_listener::MemberListenerCallback; -#[cfg(feature = "ztcontroller")] -use crate::pubsub::network_listener::NetworkListenerCallback; - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn network_listener_new( - controller_id: *const c_char, - listen_timeout: u64, - callback: NetworkListenerCallback, - user_ptr: *mut c_void, -) -> *const NetworkListener { - if listen_timeout == 0 { - println!("listen_timeout is zero"); - return std::ptr::null_mut(); - } - if controller_id.is_null() { - println!("controller_id is null"); - return std::ptr::null_mut(); - } - - let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap(); - - let rt = runtime::Handle::current(); - rt.block_on(async { - match NetworkListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await { - Ok(listener) => Arc::into_raw(listener), - Err(e) => { - println!("error creating network listener: {}", e); - std::ptr::null_mut() - } - } - }) -} - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn network_listener_delete(ptr: *const NetworkListener) { - if ptr.is_null() { - return; - } - drop(Arc::from_raw(ptr)); -} - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn network_listener_listen(ptr: *const NetworkListener) -> bool { - use std::mem::ManuallyDrop; - if ptr.is_null() { - println!("ptr is null"); - return false; - } - - let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); - - let rt = runtime::Handle::current(); - match rt.block_on(listener.listen()) { - Ok(_) => { - println!("Network listener started successfully"); - true - } - Err(e) => { - println!("Error starting network listener: {}", e); - false - } - } -} - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn network_listener_change_handler(ptr: *const NetworkListener) { - use std::mem::ManuallyDrop; - if ptr.is_null() { - println!("ptr is null"); - return; - } - - let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); - - let rt = runtime::Handle::current(); - match rt.block_on(listener.change_handler()) { - Ok(_) => { - println!("Network listener change listener completed successfully"); - } - Err(e) => { - println!("Error in network listener change listener: {}", e); - } - } -} - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn member_listener_new( - controller_id: *const c_char, - listen_timeout: u64, - callback: MemberListenerCallback, - user_ptr: *mut c_void, -) -> *const MemberListener { - if listen_timeout == 0 { - println!("listen_timeout is zero"); - return std::ptr::null_mut(); - } - if controller_id.is_null() { - println!("controller_id is null"); - return std::ptr::null_mut(); - } - - let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap(); - - let rt = runtime::Handle::current(); - rt.block_on(async { - match MemberListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await { - Ok(listener) => Arc::into_raw(listener), - Err(e) => { - println!("error creating member listener: {}", e); - std::ptr::null_mut() - } - } - }) -} - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn member_listener_delete(ptr: *const MemberListener) { - if ptr.is_null() { - return; - } - drop(Arc::from_raw(ptr)); -} - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn member_listener_listen(ptr: *const MemberListener) -> bool { - use std::mem::ManuallyDrop; - if ptr.is_null() { - println!("ptr is null"); - return false; - } - - let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); - let rt = runtime::Handle::current(); - match rt.block_on(listener.listen()) { - Ok(_) => { - println!("Member listener started successfully"); - true - } - Err(e) => { - println!("Error starting member listener: {}", e); - false - } - } -} - -#[cfg(feature = "ztcontroller")] -#[no_mangle] -pub unsafe extern "C" fn member_listener_change_handler(ptr: *const MemberListener) { - use std::mem::ManuallyDrop; - if ptr.is_null() { - println!("ptr is null"); - return; - } - - let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); - - let rt = runtime::Handle::current(); - match rt.block_on(listener.change_handler()) { - Ok(_) => { - println!("Member listener change listener completed successfully"); - } - Err(e) => { - println!("Error in member listener change listener: {}", e); - } - } -} diff --git a/rustybits/src/lib.rs b/rustybits/src/lib.rs index cac1f7219..a11b77299 100644 --- a/rustybits/src/lib.rs +++ b/rustybits/src/lib.rs @@ -1,7 +1,5 @@ pub mod ext; #[cfg(feature = "ztcontroller")] -pub mod pubsub; -#[cfg(feature = "ztcontroller")] pub mod smeeclient; #[cfg(feature = "zeroidc")] pub mod zeroidc; diff --git a/rustybits/src/pubsub/change_listener.rs b/rustybits/src/pubsub/change_listener.rs deleted file mode 100644 index c5ae1f925..000000000 --- a/rustybits/src/pubsub/change_listener.rs +++ /dev/null @@ -1,144 +0,0 @@ -use gcloud_pubsub::client::{Client, ClientConfig}; -use gcloud_pubsub::subscription::SubscriptionConfig; -use gcloud_pubsub::topic::Topic; -use std::time::Duration; -use tokio::sync::mpsc::Sender; -use tokio_util::sync::CancellationToken; - -pub struct ChangeListener { - client: Client, - topic: Topic, - subscription_name: String, - controller_id: String, - listen_timeout: Duration, - sender: Sender>, -} - -impl ChangeListener { - pub async fn new( - controller_id: &str, - topic_name: &str, - subscription_name: &str, - listen_timeout: Duration, - sender: Sender>, - ) -> Result> { - let config = ClientConfig::default().with_auth().await.unwrap(); - let client = Client::new(config).await?; - - let topic = client.topic(topic_name); - if !topic.exists(None).await? { - topic.create(None, None).await?; - } - - Ok(Self { - client, - topic, - subscription_name: subscription_name.to_string(), - controller_id: controller_id.to_string(), - listen_timeout, - sender, - }) - } - - /** - * Listens for changes on the topic and sends them to the provided sender. - * - * Listens for up to `listen_timeout` duration, at which point it will stop listening - * and return. listen will have to be called again to continue listening. - * - * If the subscription does not exist, it will create it with the specified configuration. - */ - pub async fn listen(&self) -> Result<(), Box> { - let config = SubscriptionConfig { - enable_message_ordering: true, - filter: format!("attributes.controller_id = '{}'", self.controller_id), - ..Default::default() - }; - - let subscription = self.client.subscription(self.subscription_name.as_str()); - if !subscription.exists(None).await? { - subscription - .create(self.topic.fully_qualified_name(), config, None) - .await?; - } - - let cancel = CancellationToken::new(); - let cancel2 = cancel.clone(); - - let listen_timeout = self.listen_timeout.clone(); - tokio::spawn(async move { - tokio::time::sleep(listen_timeout).await; - cancel2.cancel(); - }); - - let tx = self.sender.clone(); - let _ = subscription - .receive( - move |message, _cancel| { - let tx2 = tx.clone(); - async move { - let data = message.message.data.clone(); - - match tx2.send(data.to_vec()).await { - Ok(_) => println!("Message sent successfully"), - Err(e) => eprintln!("Failed to send message: {}", e), - } - - match message.ack().await { - Ok(_) => println!("Message acknowledged"), - Err(e) => eprintln!("Failed to acknowledge message: {}", e), - } - } - }, - cancel.clone(), - None, - ) - .await; - - Ok(()) - } -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - - use testcontainers::runners::AsyncRunner; - use testcontainers::ContainerAsync; - use testcontainers_modules::google_cloud_sdk_emulators; - use testcontainers_modules::google_cloud_sdk_emulators::CloudSdk; - use tokio; - - pub(crate) async fn setup_pubsub_emulator() -> Result<(ContainerAsync, String), Box> - { - let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?; - let port = container.get_host_port_ipv4(8085).await?; - let host = format!("localhost:{}", port); - - unsafe { - std::env::set_var("PUBSUB_EMULATOR_HOST", host.clone()); - } - - Ok((container, host)) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn test_can_connect_to_pubsub() -> Result<(), Box> { - let (_container, _host) = setup_pubsub_emulator().await?; - - let (tx, _rx) = tokio::sync::mpsc::channel(64); - - let cl = ChangeListener::new( - "test_controller", - "test_topic", - "test_subscription", - Duration::from_secs(10), - tx, - ) - .await; - - assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err()); - - Ok(()) - } -} diff --git a/rustybits/src/pubsub/member.proto b/rustybits/src/pubsub/member.proto deleted file mode 100644 index 511f1a8d7..000000000 --- a/rustybits/src/pubsub/member.proto +++ /dev/null @@ -1,39 +0,0 @@ -syntax = "proto3"; - -package pbmessages; - -message MemberChange { - message Member { - string device_id = 1; - string network_id = 2; - string identity = 3; // Identity of the member - bool authorized = 4; // Whether the member is authorized - repeated string ip_assignments = 5; // List of IP assignments - bool active_bridge = 6; // Whether the member is an active bridge - string tags = 7; // JSON string of tags - string capabilities = 8; // JSON string of capabilities - uint64 creation_time = 9; // Unix timestamp in milliseconds - bool no_auto_assign_ips = 10; // Whether auto IP assignment is disabled - uint64 revision = 11; // Revision number - uint64 last_authorized_time = 12; // Last time the member was authorized - uint64 last_deauthorized_time = 13; // Last time the member was deauthorized - optional string last_authorized_credential_type = 14; // Type of credential used for last authorization - optional string last_authorized_credential = 15; // Credential used for last authorization - int32 version_major = 16; // Major version of the member - int32 version_minor = 17; // Minor version of the member - int32 version_rev = 18; // Patch version of the member - int32 version_protocol = 19; // Protocol version of the member - int32 remote_trace_level = 20; // Remote trace level - optional string remote_trace_target = 21; // Remote trace target - bool sso_exepmt = 22; // Whether SSO is exempt - uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds - } - message MemberChangeMetadata { - string trace_id = 1; - string controller_id = 2; - } - - optional Member old = 1; - optional Member new = 2; - optional MemberChangeMetadata metadata = 3; -} diff --git a/rustybits/src/pubsub/member_listener.rs b/rustybits/src/pubsub/member_listener.rs deleted file mode 100644 index 63ac6bff3..000000000 --- a/rustybits/src/pubsub/member_listener.rs +++ /dev/null @@ -1,231 +0,0 @@ -use crate::pubsub::change_listener::ChangeListener; -use crate::pubsub::protobuf::pbmessages::MemberChange; -use prost::Message; -use std::io::Write; -use std::os::raw::c_void; -use std::sync::atomic::AtomicPtr; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::mpsc::Receiver; -use tokio::sync::Mutex; - -pub type MemberListenerCallback = extern "C" fn(*mut c_void, *const u8, usize); - -/** - * Member Listener listens for member changes and passes them back to the controller - * - * This is a wrapper around ChangeListener that specifically handles member changes. - * It uses a Tokio channel to receive messages and decodes them into MemberChange messages. - */ -pub struct MemberListener { - change_listener: ChangeListener, - rx_channel: Mutex>>, - callback: Mutex, - user_ptr: AtomicPtr, -} - -impl MemberListener { - pub async fn new( - controller_id: &str, - listen_timeout: Duration, - callback: MemberListenerCallback, - user_ptr: *mut c_void, - ) -> Result, Box> { - let (tx, rx) = tokio::sync::mpsc::channel(64); - - let change_listener = ChangeListener::new( - controller_id, - "controller-member-change-stream", - format!("{}-member-change-subscription", controller_id).as_str(), - listen_timeout, - tx, - ) - .await?; - - Ok(Arc::new(Self { - change_listener, - rx_channel: Mutex::new(rx), - callback: Mutex::new(callback), - user_ptr: AtomicPtr::new(user_ptr as *mut c_void), - })) - } - - pub async fn listen(self: &Arc) -> Result<(), Box> { - self.change_listener.listen().await - } - - pub async fn change_handler(self: &Arc) -> Result<(), Box> { - let this = self.clone(); - - let mut rx = this.rx_channel.lock().await; - while let Some(change) = rx.recv().await { - if let Ok(m) = MemberChange::decode(change.as_slice()) { - let j = serde_json::to_string(&m).unwrap(); - let mut buffer = [0; 16384]; - let mut test: &mut [u8] = &mut buffer; - let mut size: usize = 0; - while let Ok(bytes) = test.write(j.as_bytes()) { - if bytes == 0 { - break; - } - size += bytes; - } - let callback = this.callback.lock().await; - let user_ptr = this.user_ptr.load(std::sync::atomic::Ordering::Relaxed); - - (callback)(user_ptr, test.as_ptr(), size); - } else { - eprintln!("Failed to decode change"); - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::pubsub::change_listener::tests::setup_pubsub_emulator; - use crate::pubsub::protobuf::pbmessages::member_change::Member; - use crate::pubsub::protobuf::pbmessages::MemberChange; - - use gcloud_googleapis::pubsub::v1::PubsubMessage; - use gcloud_pubsub::client::{Client, ClientConfig}; - use std::{ - collections::HashMap, - sync::atomic::{AtomicBool, Ordering}, - }; - - extern "C" fn dummy_callback(user_ptr: *mut c_void, data: *const u8, _size: usize) { - // Dummy callback for testing - assert!(!data.is_null(), "data pointer is null"); - assert!(!user_ptr.is_null(), "user_ptr pointer is null"); - let user_ptr = unsafe { &mut *(user_ptr as *mut TestMemberListener) }; - user_ptr.callback_called(); - println!("Dummy callback invoked"); - } - - struct TestMemberListener { - dummy_callback_called: bool, - } - - impl TestMemberListener { - fn new() -> Self { - Self { dummy_callback_called: false } - } - - fn callback_called(&mut self) { - self.dummy_callback_called = true; - } - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_member_listener() { - println!("Setting up Pub/Sub emulator for network listener test"); - let (_container, _host) = setup_pubsub_emulator().await.unwrap(); - let mut tester = TestMemberListener::new(); - - let listener = MemberListener::new( - "testctl", - Duration::from_secs(1), - dummy_callback, - &mut tester as *mut TestMemberListener as *mut c_void, - ) - .await - .unwrap(); - - let rt = tokio::runtime::Handle::current(); - - let run = Arc::new(AtomicBool::new(true)); - rt.spawn({ - let run = run.clone(); - let l = listener.clone(); - async move { - while run.load(Ordering::Relaxed) { - match l.listen().await { - Ok(_) => { - println!("Listener exited successfully"); - } - Err(e) => { - println!("Failed to start listener: {}", e); - assert!(false, "Listener failed to start"); - } - } - } - } - }); - - rt.spawn({ - let run = run.clone(); - let l = listener.clone(); - async move { - while run.load(Ordering::Relaxed) { - match l.change_handler().await { - Ok(_) => { - println!("Change handler started successfully"); - } - Err(e) => { - println!("Failed to start change handler: {}", e); - assert!(false, "Change handler failed to start"); - } - } - } - } - }); - - rt.spawn({ - async move { - let client = Client::new(ClientConfig::default()).await.unwrap(); - let topic = client.topic("controller-member-change-stream"); - if !topic.exists(None).await.unwrap() { - topic.create(None, None).await.unwrap(); - } - - let mut publisher = topic.new_publisher(None); - - let nc = MemberChange { - old: Some(Member { - device_id: "test_member".to_string(), - network_id: "test_network".to_string(), - authorized: false, - ..Default::default() - }), - new: Some(Member { - device_id: "test_member".to_string(), - network_id: "test_network".to_string(), - authorized: true, - ..Default::default() - }), - ..Default::default() - }; - - let data = MemberChange::encode_to_vec(&nc); - let message = PubsubMessage { - data: data.into(), - attributes: HashMap::from([("controller_id".to_string(), "testctl".to_string())]), - ordering_key: format!("members-{}", "test_network"), - ..Default::default() - }; - let awaiter = publisher.publish(message).await; - - match awaiter.get().await { - Ok(_) => println!("Message published successfully"), - Err(e) => { - assert!(false, "Failed to publish message: {}", e); - eprintln!("Failed to publish message: {}", e) - } - } - publisher.shutdown().await; - } - }); - - let mut counter = 0; - while !tester.dummy_callback_called && counter < 100 { - tokio::time::sleep(Duration::from_millis(100)).await; - counter += 1; - } - run.store(false, Ordering::Relaxed); - assert!(tester.dummy_callback_called, "Callback was not called"); - } -} diff --git a/rustybits/src/pubsub/member_status.proto b/rustybits/src/pubsub/member_status.proto deleted file mode 100644 index 248390517..000000000 --- a/rustybits/src/pubsub/member_status.proto +++ /dev/null @@ -1,21 +0,0 @@ -syntax = "proto3"; - -package pbmessages; - - - -message MemberStatus { - message MemberStatusMetadata { - string trace_id = 1; - string controller_id = 2; - } - - MemberStatusMetadata metadata = 1; - string network_id = 2; - string member_id = 3; - uint64 timestamp = 4; // Unix timestamp in milliseconds - optional string ip_address = 5; // Optional IP address of the member - optional string os = 6; - optional string arch = 7; - optional string version = 8; -} diff --git a/rustybits/src/pubsub/mod.rs b/rustybits/src/pubsub/mod.rs deleted file mode 100644 index 8d5f2a5b0..000000000 --- a/rustybits/src/pubsub/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c)2025 ZeroTier, Inc. - * - * Use of this software is governed by the Business Source License included - * in the LICENSE.TXT file in the project's root directory. - * - * Change Date: 2027-01-01 - * - * On the date above, in accordance with the Business Source License, use - * of this software will be governed by version 2.0 of the Apache License. - */ -mod change_listener; -pub mod member_listener; -pub mod network_listener; -mod protobuf; diff --git a/rustybits/src/pubsub/network.proto b/rustybits/src/pubsub/network.proto deleted file mode 100644 index fc2a21385..000000000 --- a/rustybits/src/pubsub/network.proto +++ /dev/null @@ -1,65 +0,0 @@ -syntax = "proto3"; - -package pbmessages; - -message NetworkChange { - message NetworkChangeMetadata { - string trace_id = 1; - string controller_id = 2; - } - - message IPRange { - string start_ip = 1; // Start of the IP range - string end_ip = 2; // End of the IP range - } - - message Route { - string target = 1; // Target IP or network - optional string via = 2; // Optional next hop IP - } - - message DNS { - string domain = 1; // Search domain - repeated string nameservers = 2; // List of nameservers - } - - message IPV4AssignMode { - bool zt = 1; // Whether ZeroTier is used for IPv4 assignment - } - - message IPv6AssignMode { - bool six_plane = 1; // Whether 6plane is used for IPv6 assignment - bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment - bool zt = 3; // Whether ZeroTier is used for IPv6 assignment - } - - message Network { - string network_id = 1; - string capabilities = 2; // JSON string of capabilities - uint64 creation_time = 3; // Unix timestamp in milliseconds - bool enable_broadcast = 4; // Whether broadcast is enabled - repeated IPRange assignment_pools = 5; // List of IP ranges for assignment - uint32 mtu = 6; // Maximum Transmission Unit - uint32 multicast_limit = 7; // Limit for multicast messages - optional string name = 8; // Name of the network - bool is_private = 9; // Whether the network is private - uint32 remote_trace_level = 10; // Remote trace level - optional string remote_trace_target = 11; // Remote trace target - uint64 revision = 12; // Revision number - repeated Route routes = 13; // List of routes - string rules = 14; // JSON string of rules - optional string tags = 15; // JSON string of tags - IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode - IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode - optional DNS dns = 18; // DNS configuration - bool sso_enabled = 19; // Whether Single Sign-On is enabled - optional string sso_client_id = 20; // SSO client ID - optional string sso_authorization_endpoint = 21; // SSO authorization endpoint - optional string sso_issuer = 22; // SSO issuer - optional string sso_provider = 23; // SSO provider - } - - optional Network old = 1; - optional Network new = 2; - optional NetworkChangeMetadata metadata = 3; -} diff --git a/rustybits/src/pubsub/network_listener.rs b/rustybits/src/pubsub/network_listener.rs deleted file mode 100644 index 1478d0859..000000000 --- a/rustybits/src/pubsub/network_listener.rs +++ /dev/null @@ -1,232 +0,0 @@ -use crate::pubsub::change_listener::ChangeListener; -use crate::pubsub::protobuf::pbmessages::NetworkChange; -use prost::Message; -use serde_json; -use std::io::Write; -use std::os::raw::c_void; -use std::sync::atomic::{AtomicPtr, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::mpsc::Receiver; -use tokio::sync::Mutex; - -pub type NetworkListenerCallback = extern "C" fn(*mut c_void, *const u8, usize); - -/** - * Network Listener listens for network changes and passes them back to the controller - * - * This is a wrapper around ChangeListener that specifically handles network changes. - * It uses a Tokio channel to receive messages and decodes them into NetworkChange messages. - */ -pub struct NetworkListener { - change_listener: ChangeListener, - rx_channel: Mutex>>, - callback: Mutex, - user_ptr: AtomicPtr, -} - -impl NetworkListener { - pub async fn new( - controller_id: &str, - listen_timeout: Duration, - callback: NetworkListenerCallback, - user_ptr: *mut c_void, - ) -> Result, Box> { - let (tx, rx) = tokio::sync::mpsc::channel(64); - - let change_listener = ChangeListener::new( - controller_id, - "controller-network-change-stream", - format!("{}-network-change-subscription", controller_id).as_str(), - listen_timeout, - tx, - ) - .await?; - - Ok(Arc::new(Self { - change_listener, - rx_channel: Mutex::new(rx), - callback: Mutex::new(callback), - user_ptr: AtomicPtr::new(user_ptr as *mut c_void), - })) - } - - pub async fn listen(self: &Arc) -> Result<(), Box> { - self.change_listener.listen().await - } - - pub async fn change_handler(self: &Arc) -> Result<(), Box> { - let this = self.clone(); - - let mut rx = this.rx_channel.lock().await; - while let Some(change) = rx.recv().await { - if let Ok(m) = NetworkChange::decode(change.as_slice()) { - let j = serde_json::to_string(&m).unwrap(); - let mut buffer = [0; 16384]; - let mut test: &mut [u8] = &mut buffer; - let mut size: usize = 0; - while let Ok(bytes) = test.write(j.as_bytes()) { - if bytes == 0 { - break; // No more space to write - } - size += bytes; - } - let callback = this.callback.lock().await; - let user_ptr = this.user_ptr.load(Ordering::Relaxed); - - (callback)(user_ptr, test.as_ptr(), size); - } else { - eprintln!("Failed to decode change"); - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::pubsub::change_listener::tests::setup_pubsub_emulator; - use crate::pubsub::protobuf::pbmessages::network_change::Network; - use crate::pubsub::protobuf::pbmessages::NetworkChange; - - use gcloud_googleapis::pubsub::v1::PubsubMessage; - use gcloud_pubsub::client::{Client, ClientConfig}; - use std::{ - collections::HashMap, - sync::atomic::{AtomicBool, Ordering}, - }; - - extern "C" fn dummy_callback(user_ptr: *mut c_void, data: *const u8, _size: usize) { - // Dummy callback for testing - assert!(!data.is_null(), "data pointer is null"); - assert!(!user_ptr.is_null(), "user_ptr pointer is null"); - let user_ptr = unsafe { &mut *(user_ptr as *mut TestNetworkListenr) }; - user_ptr.callback_called(); - println!("Dummy callback invoked"); - } - - struct TestNetworkListenr { - dummy_callback_called: bool, - } - - impl TestNetworkListenr { - fn new() -> Self { - Self { dummy_callback_called: false } - } - - fn callback_called(&mut self) { - self.dummy_callback_called = true; - } - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_network_listener() { - println!("Setting up Pub/Sub emulator for network listener test"); - let (_container, _host) = setup_pubsub_emulator().await.unwrap(); - - let mut tester = TestNetworkListenr::new(); - - let listener = NetworkListener::new( - "testctl", - Duration::from_secs(1), - dummy_callback, - &mut tester as *mut TestNetworkListenr as *mut c_void, - ) - .await - .unwrap(); - - let rt = tokio::runtime::Handle::current(); - - let run = Arc::new(AtomicBool::new(true)); - rt.spawn({ - let run = run.clone(); - let l = listener.clone(); - async move { - while run.load(Ordering::Relaxed) { - match l.listen().await { - Ok(_) => { - println!("Listener exited successfully"); - } - Err(e) => { - println!("Failed to start listener: {}", e); - assert!(false, "Listener failed to start"); - } - } - } - } - }); - - rt.spawn({ - let run = run.clone(); - let l = listener.clone(); - async move { - while run.load(Ordering::Relaxed) { - match l.change_handler().await { - Ok(_) => { - println!("Change handler started successfully"); - } - Err(e) => { - println!("Failed to start change handler: {}", e); - assert!(false, "Change handler failed to start"); - } - } - } - } - }); - - rt.spawn({ - async move { - let client = Client::new(ClientConfig::default()).await.unwrap(); - let topic = client.topic("controller-network-change-stream"); - if !topic.exists(None).await.unwrap() { - topic.create(None, None).await.unwrap(); - } - - let mut publisher = topic.new_publisher(None); - - let nc = NetworkChange { - old: Some(Network { - network_id: "test_network".to_string(), - name: Some("Test Network".to_string()), - ..Default::default() - }), - new: Some(Network { - network_id: "test_network".to_string(), - name: Some("Test Network Updated".to_string()), - ..Default::default() - }), - ..Default::default() - }; - - let data = NetworkChange::encode_to_vec(&nc); - let message = PubsubMessage { - data: data.into(), - attributes: HashMap::from([("controller_id".to_string(), "testctl".to_string())]), - ordering_key: format!("networks-{}", "testctl"), - ..Default::default() - }; - let awaiter = publisher.publish(message).await; - - match awaiter.get().await { - Ok(_) => println!("Message published successfully"), - Err(e) => { - assert!(false, "Failed to publish message: {}", e); - eprintln!("Failed to publish message: {}", e) - } - } - publisher.shutdown().await; - } - }); - - let mut counter = 0; - while !tester.dummy_callback_called && counter < 100 { - tokio::time::sleep(Duration::from_millis(100)).await; - counter += 1; - } - - run.store(false, Ordering::Relaxed); - assert!(tester.dummy_callback_called, "Callback was not called"); - } -} diff --git a/rustybits/src/pubsub/protobuf.rs b/rustybits/src/pubsub/protobuf.rs deleted file mode 100644 index aa2614865..000000000 --- a/rustybits/src/pubsub/protobuf.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub(crate) mod pbmessages { - include!(concat!(env!("OUT_DIR"), "/pbmessages.rs")); -}