remove rust-based pubsub

This commit is contained in:
Grant Limberg 2025-09-01 14:55:16 -07:00
parent db6e698245
commit 0f63783592
13 changed files with 8 additions and 1321 deletions

359
rustybits/Cargo.lock generated
View file

@ -103,39 +103,6 @@ version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.88"
@ -180,7 +147,7 @@ dependencies = [
"rustversion",
"serde",
"sync_wrapper 1.0.2",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
]
@ -431,15 +398,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@ -481,15 +439,6 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
@ -874,12 +823,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fastrand"
version = "2.3.0"
@ -920,16 +863,6 @@ version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "flate2"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1078,85 +1011,6 @@ dependencies = [
"slab",
]
[[package]]
name = "gcloud-auth"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4089aeec499899f6f1309803279763c34d898ca86c1a3e4616cba3ca6bce34b7"
dependencies = [
"async-trait",
"base64 0.22.1",
"gcloud-metadata",
"home",
"jsonwebtoken",
"reqwest 0.12.22",
"serde",
"serde_json",
"thiserror 1.0.69",
"time",
"token-source",
"tokio",
"tracing",
"urlencoding",
]
[[package]]
name = "gcloud-gax"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0a39057a0654184e074ddefb734c9a326ba468663b8f329c95f0069b5ccdb5"
dependencies = [
"http 1.3.1",
"thiserror 1.0.69",
"token-source",
"tokio",
"tokio-retry2",
"tonic",
"tower 0.4.13",
"tracing",
]
[[package]]
name = "gcloud-googleapis"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501459a508e7887cfedc45a45ee41602ac1f66d2b61deb05f1c2256bf2faf46d"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
"tonic",
]
[[package]]
name = "gcloud-metadata"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d575310b4546530f6b21ee000c20155f11f9291fa0b67ea0949fd48aa49ed70"
dependencies = [
"reqwest 0.12.22",
"thiserror 1.0.69",
"tokio",
]
[[package]]
name = "gcloud-pubsub"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1d55e2652753753d902a84c56613b9ae35a36416d94e56c1268764bc7e79f05"
dependencies = [
"async-channel",
"async-stream",
"gcloud-auth",
"gcloud-gax",
"gcloud-googleapis",
"prost-types 0.13.5",
"thiserror 1.0.69",
"token-source",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "generic-array"
version = "0.14.7"
@ -1505,29 +1359,12 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"native-tls",
"tokio",
"tokio-native-tls",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f66d5bd4c6f02bf0542fad85d626775bab9258cf795a4256dcaf3161114d1df"
dependencies = [
"base64 0.22.1",
"bytes",
"futures-channel",
"futures-core",
@ -1535,9 +1372,7 @@ dependencies = [
"http 1.3.1",
"http-body 1.0.1",
"hyper 1.6.0",
"ipnet",
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"tokio",
@ -1754,16 +1589,6 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "iri-string"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
@ -1804,21 +1629,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonwebtoken"
version = "9.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde"
dependencies = [
"base64 0.22.1",
"js-sys",
"pem",
"ring",
"serde",
"serde_json",
"simple_asn1",
]
[[package]]
name = "jwt"
version = "0.16.0"
@ -2029,16 +1839,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"num-integer",
"num-traits",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.4"
@ -2103,7 +1903,7 @@ dependencies = [
"getrandom 0.2.16",
"http 0.2.12",
"rand 0.8.5",
"reqwest 0.11.27",
"reqwest",
"serde",
"serde_json",
"serde_path_to_error",
@ -2310,16 +2110,6 @@ dependencies = [
"syn",
]
[[package]]
name = "pem"
version = "3.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3"
dependencies = [
"base64 0.22.1",
"serde",
]
[[package]]
name = "pem-rfc7468"
version = "0.7.0"
@ -2851,7 +2641,7 @@ dependencies = [
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"hyper-tls 0.5.0",
"hyper-tls",
"ipnet",
"js-sys",
"log",
@ -2876,44 +2666,6 @@ dependencies = [
"winreg",
]
[[package]]
name = "reqwest"
version = "0.12.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-core",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.6.0",
"hyper-tls 0.6.0",
"hyper-util",
"js-sys",
"log",
"mime",
"native-tls",
"percent-encoding",
"pin-project-lite",
"rustls-pki-types",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper 1.0.2",
"tokio",
"tokio-native-tls",
"tower 0.5.2",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "rfc6979"
version = "0.4.0"
@ -3101,15 +2853,10 @@ dependencies = [
"base64 0.21.7",
"bytes",
"cbindgen",
"gcloud-gax",
"gcloud-googleapis",
"gcloud-pubsub",
"jwt",
"openidconnect",
"prost 0.14.1",
"prost-build 0.14.1",
"prost-types 0.14.1",
"reqwest 0.11.27",
"reqwest",
"serde",
"serde_json",
"temporal-client",
@ -3396,18 +3143,6 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "simple_asn1"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb"
dependencies = [
"num-bigint",
"num-traits",
"thiserror 2.0.12",
"time",
]
[[package]]
name = "siphasher"
version = "1.0.1"
@ -3533,9 +3268,6 @@ name = "sync_wrapper"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
dependencies = [
"futures-core",
]
[[package]]
name = "synstructure"
@ -3621,7 +3353,7 @@ dependencies = [
"thiserror 2.0.12",
"tokio",
"tonic",
"tower 0.5.2",
"tower",
"tracing",
"url",
"uuid",
@ -3872,15 +3604,6 @@ dependencies = [
"zerovec",
]
[[package]]
name = "token-source"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75746ae15bef509f21039a652383104424208fdae172a964a8930858b9a78412"
dependencies = [
"async-trait",
]
[[package]]
name = "tokio"
version = "1.46.1"
@ -3922,16 +3645,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-retry2"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1264d076dd34560544a2799e40e457bd07c43d30f4a845686b031bcd8455c84f"
dependencies = [
"pin-project",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.2"
@ -4032,7 +3745,6 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"flate2",
"h2 0.4.11",
"http 1.3.1",
"http-body 1.0.1",
@ -4048,11 +3760,10 @@ dependencies = [
"tokio",
"tokio-rustls",
"tokio-stream",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
"webpki-roots 0.26.11",
]
[[package]]
@ -4069,21 +3780,6 @@ dependencies = [
"syn",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.5.2"
@ -4103,24 +3799,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [
"bitflags 2.9.1",
"bytes",
"futures-util",
"http 1.3.1",
"http-body 1.0.1",
"iri-string",
"pin-project-lite",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
@ -4139,7 +3817,6 @@ version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@ -4255,12 +3932,6 @@ dependencies = [
"serde",
]
[[package]]
name = "urlencoding"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]]
name = "utf8_iter"
version = "1.0.4"
@ -4417,24 +4088,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.26.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
dependencies = [
"webpki-roots 1.0.2",
]
[[package]]
name = "webpki-roots"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi"
version = "0.3.9"

View file

@ -7,7 +7,7 @@ edition = "2021"
crate-type = ["staticlib", "rlib"]
[features]
default = ["zeroidc", "ztcontroller"]
default = ["zeroidc"]
zeroidc = []
ztcontroller = [
"dep:serde",
@ -15,11 +15,6 @@ ztcontroller = [
"dep:temporal-sdk",
"dep:temporal-client",
"dep:temporal-sdk-core-protos",
"dep:gcloud-pubsub",
"dep:prost",
"dep:prost-types",
"dep:gcloud-gax",
"dep:gcloud-googleapis",
"dep:tokio",
"dep:tokio-util",
]
@ -51,11 +46,6 @@ jwt = { version = "0.16", git = "https://github.com/glimberg/rust-jwt" }
time = { version = "~0.3", features = ["formatting"] }
bytes = "1.3"
thiserror = "1"
gcloud-pubsub = { version = "1.3.0", optional = true }
prost = { version = "0.14", optional = true, features = ["derive"] }
prost-types = { version = "0.14", optional = true }
gcloud-gax = { version = "1.2.0", optional = true }
gcloud-googleapis = { version = "1.2.0", optional = true }
[dev-dependencies]
testcontainers = { version = "0.24", features = ["blocking"] }

View file

@ -4,23 +4,6 @@ use cbindgen::{Config, Language, MacroExpansionConfig};
use std::env;
use std::path::PathBuf;
fn main() {
#[cfg(feature = "ztcontroller")]
{
let mut prost_build = prost_build::Config::new();
prost_build
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.compile_protos(
&[
"src/pubsub/network.proto",
"src/pubsub/member.proto",
"src/pubsub/member_status.proto",
],
&["src/pubsub/"],
)
.expect("Failed to compile protobuf files");
}
let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
let package_name = env::var("CARGO_PKG_NAME").unwrap();

View file

@ -13,12 +13,6 @@
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
#[cfg(feature = "ztcontroller")]
use std::os::raw::c_void;
#[cfg(feature = "ztcontroller")]
use std::sync::Arc;
#[cfg(feature = "ztcontroller")]
use std::time::Duration;
#[cfg(feature = "ztcontroller")]
use tokio::runtime;
use url::Url;
@ -34,7 +28,7 @@ static SHUTDOWN: std::sync::Once = std::sync::Once::new();
#[no_mangle]
pub unsafe extern "C" fn init_async_runtime() {
START.call_once(|| {
let rt = tokio::runtime::Builder::new_multi_thread()
let rt = runtime::Builder::new_multi_thread()
.worker_threads(4)
.thread_name("rust-async-worker")
.enable_all()
@ -499,184 +493,3 @@ pub unsafe extern "C" fn smee_client_notify_network_joined(
}
}
}
#[cfg(feature = "ztcontroller")]
use crate::pubsub::member_listener::MemberListener;
#[cfg(feature = "ztcontroller")]
use crate::pubsub::network_listener::NetworkListener;
#[cfg(feature = "ztcontroller")]
use crate::pubsub::member_listener::MemberListenerCallback;
#[cfg(feature = "ztcontroller")]
use crate::pubsub::network_listener::NetworkListenerCallback;
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn network_listener_new(
controller_id: *const c_char,
listen_timeout: u64,
callback: NetworkListenerCallback,
user_ptr: *mut c_void,
) -> *const NetworkListener {
if listen_timeout == 0 {
println!("listen_timeout is zero");
return std::ptr::null_mut();
}
if controller_id.is_null() {
println!("controller_id is null");
return std::ptr::null_mut();
}
let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap();
let rt = runtime::Handle::current();
rt.block_on(async {
match NetworkListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await {
Ok(listener) => Arc::into_raw(listener),
Err(e) => {
println!("error creating network listener: {}", e);
std::ptr::null_mut()
}
}
})
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn network_listener_delete(ptr: *const NetworkListener) {
if ptr.is_null() {
return;
}
drop(Arc::from_raw(ptr));
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn network_listener_listen(ptr: *const NetworkListener) -> bool {
use std::mem::ManuallyDrop;
if ptr.is_null() {
println!("ptr is null");
return false;
}
let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) });
let rt = runtime::Handle::current();
match rt.block_on(listener.listen()) {
Ok(_) => {
println!("Network listener started successfully");
true
}
Err(e) => {
println!("Error starting network listener: {}", e);
false
}
}
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn network_listener_change_handler(ptr: *const NetworkListener) {
use std::mem::ManuallyDrop;
if ptr.is_null() {
println!("ptr is null");
return;
}
let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) });
let rt = runtime::Handle::current();
match rt.block_on(listener.change_handler()) {
Ok(_) => {
println!("Network listener change listener completed successfully");
}
Err(e) => {
println!("Error in network listener change listener: {}", e);
}
}
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn member_listener_new(
controller_id: *const c_char,
listen_timeout: u64,
callback: MemberListenerCallback,
user_ptr: *mut c_void,
) -> *const MemberListener {
if listen_timeout == 0 {
println!("listen_timeout is zero");
return std::ptr::null_mut();
}
if controller_id.is_null() {
println!("controller_id is null");
return std::ptr::null_mut();
}
let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap();
let rt = runtime::Handle::current();
rt.block_on(async {
match MemberListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await {
Ok(listener) => Arc::into_raw(listener),
Err(e) => {
println!("error creating member listener: {}", e);
std::ptr::null_mut()
}
}
})
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn member_listener_delete(ptr: *const MemberListener) {
if ptr.is_null() {
return;
}
drop(Arc::from_raw(ptr));
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn member_listener_listen(ptr: *const MemberListener) -> bool {
use std::mem::ManuallyDrop;
if ptr.is_null() {
println!("ptr is null");
return false;
}
let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) });
let rt = runtime::Handle::current();
match rt.block_on(listener.listen()) {
Ok(_) => {
println!("Member listener started successfully");
true
}
Err(e) => {
println!("Error starting member listener: {}", e);
false
}
}
}
#[cfg(feature = "ztcontroller")]
#[no_mangle]
pub unsafe extern "C" fn member_listener_change_handler(ptr: *const MemberListener) {
use std::mem::ManuallyDrop;
if ptr.is_null() {
println!("ptr is null");
return;
}
let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) });
let rt = runtime::Handle::current();
match rt.block_on(listener.change_handler()) {
Ok(_) => {
println!("Member listener change listener completed successfully");
}
Err(e) => {
println!("Error in member listener change listener: {}", e);
}
}
}

View file

@ -1,7 +1,5 @@
pub mod ext;
#[cfg(feature = "ztcontroller")]
pub mod pubsub;
#[cfg(feature = "ztcontroller")]
pub mod smeeclient;
#[cfg(feature = "zeroidc")]
pub mod zeroidc;

View file

@ -1,144 +0,0 @@
use gcloud_pubsub::client::{Client, ClientConfig};
use gcloud_pubsub::subscription::SubscriptionConfig;
use gcloud_pubsub::topic::Topic;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;
pub struct ChangeListener {
client: Client,
topic: Topic,
subscription_name: String,
controller_id: String,
listen_timeout: Duration,
sender: Sender<Vec<u8>>,
}
impl ChangeListener {
pub async fn new(
controller_id: &str,
topic_name: &str,
subscription_name: &str,
listen_timeout: Duration,
sender: Sender<Vec<u8>>,
) -> Result<Self, Box<dyn std::error::Error>> {
let config = ClientConfig::default().with_auth().await.unwrap();
let client = Client::new(config).await?;
let topic = client.topic(topic_name);
if !topic.exists(None).await? {
topic.create(None, None).await?;
}
Ok(Self {
client,
topic,
subscription_name: subscription_name.to_string(),
controller_id: controller_id.to_string(),
listen_timeout,
sender,
})
}
/**
* Listens for changes on the topic and sends them to the provided sender.
*
* Listens for up to `listen_timeout` duration, at which point it will stop listening
* and return. listen will have to be called again to continue listening.
*
* If the subscription does not exist, it will create it with the specified configuration.
*/
pub async fn listen(&self) -> Result<(), Box<dyn std::error::Error>> {
let config = SubscriptionConfig {
enable_message_ordering: true,
filter: format!("attributes.controller_id = '{}'", self.controller_id),
..Default::default()
};
let subscription = self.client.subscription(self.subscription_name.as_str());
if !subscription.exists(None).await? {
subscription
.create(self.topic.fully_qualified_name(), config, None)
.await?;
}
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
let listen_timeout = self.listen_timeout.clone();
tokio::spawn(async move {
tokio::time::sleep(listen_timeout).await;
cancel2.cancel();
});
let tx = self.sender.clone();
let _ = subscription
.receive(
move |message, _cancel| {
let tx2 = tx.clone();
async move {
let data = message.message.data.clone();
match tx2.send(data.to_vec()).await {
Ok(_) => println!("Message sent successfully"),
Err(e) => eprintln!("Failed to send message: {}", e),
}
match message.ack().await {
Ok(_) => println!("Message acknowledged"),
Err(e) => eprintln!("Failed to acknowledge message: {}", e),
}
}
},
cancel.clone(),
None,
)
.await;
Ok(())
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use testcontainers::runners::AsyncRunner;
use testcontainers::ContainerAsync;
use testcontainers_modules::google_cloud_sdk_emulators;
use testcontainers_modules::google_cloud_sdk_emulators::CloudSdk;
use tokio;
pub(crate) async fn setup_pubsub_emulator() -> Result<(ContainerAsync<CloudSdk>, String), Box<dyn std::error::Error>>
{
let container = google_cloud_sdk_emulators::CloudSdk::pubsub().start().await?;
let port = container.get_host_port_ipv4(8085).await?;
let host = format!("localhost:{}", port);
unsafe {
std::env::set_var("PUBSUB_EMULATOR_HOST", host.clone());
}
Ok((container, host))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_can_connect_to_pubsub() -> Result<(), Box<dyn std::error::Error + 'static>> {
let (_container, _host) = setup_pubsub_emulator().await?;
let (tx, _rx) = tokio::sync::mpsc::channel(64);
let cl = ChangeListener::new(
"test_controller",
"test_topic",
"test_subscription",
Duration::from_secs(10),
tx,
)
.await;
assert!(cl.is_ok(), "Failed to connect to pubsub emulator: {:?}", cl.err());
Ok(())
}
}

View file

@ -1,39 +0,0 @@
syntax = "proto3";
package pbmessages;
message MemberChange {
message Member {
string device_id = 1;
string network_id = 2;
string identity = 3; // Identity of the member
bool authorized = 4; // Whether the member is authorized
repeated string ip_assignments = 5; // List of IP assignments
bool active_bridge = 6; // Whether the member is an active bridge
string tags = 7; // JSON string of tags
string capabilities = 8; // JSON string of capabilities
uint64 creation_time = 9; // Unix timestamp in milliseconds
bool no_auto_assign_ips = 10; // Whether auto IP assignment is disabled
uint64 revision = 11; // Revision number
uint64 last_authorized_time = 12; // Last time the member was authorized
uint64 last_deauthorized_time = 13; // Last time the member was deauthorized
optional string last_authorized_credential_type = 14; // Type of credential used for last authorization
optional string last_authorized_credential = 15; // Credential used for last authorization
int32 version_major = 16; // Major version of the member
int32 version_minor = 17; // Minor version of the member
int32 version_rev = 18; // Patch version of the member
int32 version_protocol = 19; // Protocol version of the member
int32 remote_trace_level = 20; // Remote trace level
optional string remote_trace_target = 21; // Remote trace target
bool sso_exepmt = 22; // Whether SSO is exempt
uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds
}
message MemberChangeMetadata {
string trace_id = 1;
string controller_id = 2;
}
optional Member old = 1;
optional Member new = 2;
optional MemberChangeMetadata metadata = 3;
}

View file

@ -1,231 +0,0 @@
use crate::pubsub::change_listener::ChangeListener;
use crate::pubsub::protobuf::pbmessages::MemberChange;
use prost::Message;
use std::io::Write;
use std::os::raw::c_void;
use std::sync::atomic::AtomicPtr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
pub type MemberListenerCallback = extern "C" fn(*mut c_void, *const u8, usize);
/**
* Member Listener listens for member changes and passes them back to the controller
*
* This is a wrapper around ChangeListener that specifically handles member changes.
* It uses a Tokio channel to receive messages and decodes them into MemberChange messages.
*/
pub struct MemberListener {
change_listener: ChangeListener,
rx_channel: Mutex<Receiver<Vec<u8>>>,
callback: Mutex<MemberListenerCallback>,
user_ptr: AtomicPtr<c_void>,
}
impl MemberListener {
pub async fn new(
controller_id: &str,
listen_timeout: Duration,
callback: MemberListenerCallback,
user_ptr: *mut c_void,
) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let change_listener = ChangeListener::new(
controller_id,
"controller-member-change-stream",
format!("{}-member-change-subscription", controller_id).as_str(),
listen_timeout,
tx,
)
.await?;
Ok(Arc::new(Self {
change_listener,
rx_channel: Mutex::new(rx),
callback: Mutex::new(callback),
user_ptr: AtomicPtr::new(user_ptr as *mut c_void),
}))
}
pub async fn listen(self: &Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
self.change_listener.listen().await
}
pub async fn change_handler(self: &Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let this = self.clone();
let mut rx = this.rx_channel.lock().await;
while let Some(change) = rx.recv().await {
if let Ok(m) = MemberChange::decode(change.as_slice()) {
let j = serde_json::to_string(&m).unwrap();
let mut buffer = [0; 16384];
let mut test: &mut [u8] = &mut buffer;
let mut size: usize = 0;
while let Ok(bytes) = test.write(j.as_bytes()) {
if bytes == 0 {
break;
}
size += bytes;
}
let callback = this.callback.lock().await;
let user_ptr = this.user_ptr.load(std::sync::atomic::Ordering::Relaxed);
(callback)(user_ptr, test.as_ptr(), size);
} else {
eprintln!("Failed to decode change");
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pubsub::change_listener::tests::setup_pubsub_emulator;
use crate::pubsub::protobuf::pbmessages::member_change::Member;
use crate::pubsub::protobuf::pbmessages::MemberChange;
use gcloud_googleapis::pubsub::v1::PubsubMessage;
use gcloud_pubsub::client::{Client, ClientConfig};
use std::{
collections::HashMap,
sync::atomic::{AtomicBool, Ordering},
};
extern "C" fn dummy_callback(user_ptr: *mut c_void, data: *const u8, _size: usize) {
// Dummy callback for testing
assert!(!data.is_null(), "data pointer is null");
assert!(!user_ptr.is_null(), "user_ptr pointer is null");
let user_ptr = unsafe { &mut *(user_ptr as *mut TestMemberListener) };
user_ptr.callback_called();
println!("Dummy callback invoked");
}
struct TestMemberListener {
dummy_callback_called: bool,
}
impl TestMemberListener {
fn new() -> Self {
Self { dummy_callback_called: false }
}
fn callback_called(&mut self) {
self.dummy_callback_called = true;
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_member_listener() {
println!("Setting up Pub/Sub emulator for network listener test");
let (_container, _host) = setup_pubsub_emulator().await.unwrap();
let mut tester = TestMemberListener::new();
let listener = MemberListener::new(
"testctl",
Duration::from_secs(1),
dummy_callback,
&mut tester as *mut TestMemberListener as *mut c_void,
)
.await
.unwrap();
let rt = tokio::runtime::Handle::current();
let run = Arc::new(AtomicBool::new(true));
rt.spawn({
let run = run.clone();
let l = listener.clone();
async move {
while run.load(Ordering::Relaxed) {
match l.listen().await {
Ok(_) => {
println!("Listener exited successfully");
}
Err(e) => {
println!("Failed to start listener: {}", e);
assert!(false, "Listener failed to start");
}
}
}
}
});
rt.spawn({
let run = run.clone();
let l = listener.clone();
async move {
while run.load(Ordering::Relaxed) {
match l.change_handler().await {
Ok(_) => {
println!("Change handler started successfully");
}
Err(e) => {
println!("Failed to start change handler: {}", e);
assert!(false, "Change handler failed to start");
}
}
}
}
});
rt.spawn({
async move {
let client = Client::new(ClientConfig::default()).await.unwrap();
let topic = client.topic("controller-member-change-stream");
if !topic.exists(None).await.unwrap() {
topic.create(None, None).await.unwrap();
}
let mut publisher = topic.new_publisher(None);
let nc = MemberChange {
old: Some(Member {
device_id: "test_member".to_string(),
network_id: "test_network".to_string(),
authorized: false,
..Default::default()
}),
new: Some(Member {
device_id: "test_member".to_string(),
network_id: "test_network".to_string(),
authorized: true,
..Default::default()
}),
..Default::default()
};
let data = MemberChange::encode_to_vec(&nc);
let message = PubsubMessage {
data: data.into(),
attributes: HashMap::from([("controller_id".to_string(), "testctl".to_string())]),
ordering_key: format!("members-{}", "test_network"),
..Default::default()
};
let awaiter = publisher.publish(message).await;
match awaiter.get().await {
Ok(_) => println!("Message published successfully"),
Err(e) => {
assert!(false, "Failed to publish message: {}", e);
eprintln!("Failed to publish message: {}", e)
}
}
publisher.shutdown().await;
}
});
let mut counter = 0;
while !tester.dummy_callback_called && counter < 100 {
tokio::time::sleep(Duration::from_millis(100)).await;
counter += 1;
}
run.store(false, Ordering::Relaxed);
assert!(tester.dummy_callback_called, "Callback was not called");
}
}

View file

@ -1,21 +0,0 @@
syntax = "proto3";
package pbmessages;
message MemberStatus {
message MemberStatusMetadata {
string trace_id = 1;
string controller_id = 2;
}
MemberStatusMetadata metadata = 1;
string network_id = 2;
string member_id = 3;
uint64 timestamp = 4; // Unix timestamp in milliseconds
optional string ip_address = 5; // Optional IP address of the member
optional string os = 6;
optional string arch = 7;
optional string version = 8;
}

View file

@ -1,15 +0,0 @@
/*
* Copyright (c)2025 ZeroTier, Inc.
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
*
* Change Date: 2027-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
mod change_listener;
pub mod member_listener;
pub mod network_listener;
mod protobuf;

View file

@ -1,65 +0,0 @@
syntax = "proto3";
package pbmessages;
message NetworkChange {
message NetworkChangeMetadata {
string trace_id = 1;
string controller_id = 2;
}
message IPRange {
string start_ip = 1; // Start of the IP range
string end_ip = 2; // End of the IP range
}
message Route {
string target = 1; // Target IP or network
optional string via = 2; // Optional next hop IP
}
message DNS {
string domain = 1; // Search domain
repeated string nameservers = 2; // List of nameservers
}
message IPV4AssignMode {
bool zt = 1; // Whether ZeroTier is used for IPv4 assignment
}
message IPv6AssignMode {
bool six_plane = 1; // Whether 6plane is used for IPv6 assignment
bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment
bool zt = 3; // Whether ZeroTier is used for IPv6 assignment
}
message Network {
string network_id = 1;
string capabilities = 2; // JSON string of capabilities
uint64 creation_time = 3; // Unix timestamp in milliseconds
bool enable_broadcast = 4; // Whether broadcast is enabled
repeated IPRange assignment_pools = 5; // List of IP ranges for assignment
uint32 mtu = 6; // Maximum Transmission Unit
uint32 multicast_limit = 7; // Limit for multicast messages
optional string name = 8; // Name of the network
bool is_private = 9; // Whether the network is private
uint32 remote_trace_level = 10; // Remote trace level
optional string remote_trace_target = 11; // Remote trace target
uint64 revision = 12; // Revision number
repeated Route routes = 13; // List of routes
string rules = 14; // JSON string of rules
optional string tags = 15; // JSON string of tags
IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode
IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode
optional DNS dns = 18; // DNS configuration
bool sso_enabled = 19; // Whether Single Sign-On is enabled
optional string sso_client_id = 20; // SSO client ID
optional string sso_authorization_endpoint = 21; // SSO authorization endpoint
optional string sso_issuer = 22; // SSO issuer
optional string sso_provider = 23; // SSO provider
}
optional Network old = 1;
optional Network new = 2;
optional NetworkChangeMetadata metadata = 3;
}

View file

@ -1,232 +0,0 @@
use crate::pubsub::change_listener::ChangeListener;
use crate::pubsub::protobuf::pbmessages::NetworkChange;
use prost::Message;
use serde_json;
use std::io::Write;
use std::os::raw::c_void;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
pub type NetworkListenerCallback = extern "C" fn(*mut c_void, *const u8, usize);
/**
* Network Listener listens for network changes and passes them back to the controller
*
* This is a wrapper around ChangeListener that specifically handles network changes.
* It uses a Tokio channel to receive messages and decodes them into NetworkChange messages.
*/
pub struct NetworkListener {
change_listener: ChangeListener,
rx_channel: Mutex<Receiver<Vec<u8>>>,
callback: Mutex<NetworkListenerCallback>,
user_ptr: AtomicPtr<c_void>,
}
impl NetworkListener {
pub async fn new(
controller_id: &str,
listen_timeout: Duration,
callback: NetworkListenerCallback,
user_ptr: *mut c_void,
) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::channel(64);
let change_listener = ChangeListener::new(
controller_id,
"controller-network-change-stream",
format!("{}-network-change-subscription", controller_id).as_str(),
listen_timeout,
tx,
)
.await?;
Ok(Arc::new(Self {
change_listener,
rx_channel: Mutex::new(rx),
callback: Mutex::new(callback),
user_ptr: AtomicPtr::new(user_ptr as *mut c_void),
}))
}
pub async fn listen(self: &Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
self.change_listener.listen().await
}
pub async fn change_handler(self: &Arc<Self>) -> Result<(), Box<dyn std::error::Error>> {
let this = self.clone();
let mut rx = this.rx_channel.lock().await;
while let Some(change) = rx.recv().await {
if let Ok(m) = NetworkChange::decode(change.as_slice()) {
let j = serde_json::to_string(&m).unwrap();
let mut buffer = [0; 16384];
let mut test: &mut [u8] = &mut buffer;
let mut size: usize = 0;
while let Ok(bytes) = test.write(j.as_bytes()) {
if bytes == 0 {
break; // No more space to write
}
size += bytes;
}
let callback = this.callback.lock().await;
let user_ptr = this.user_ptr.load(Ordering::Relaxed);
(callback)(user_ptr, test.as_ptr(), size);
} else {
eprintln!("Failed to decode change");
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pubsub::change_listener::tests::setup_pubsub_emulator;
use crate::pubsub::protobuf::pbmessages::network_change::Network;
use crate::pubsub::protobuf::pbmessages::NetworkChange;
use gcloud_googleapis::pubsub::v1::PubsubMessage;
use gcloud_pubsub::client::{Client, ClientConfig};
use std::{
collections::HashMap,
sync::atomic::{AtomicBool, Ordering},
};
extern "C" fn dummy_callback(user_ptr: *mut c_void, data: *const u8, _size: usize) {
// Dummy callback for testing
assert!(!data.is_null(), "data pointer is null");
assert!(!user_ptr.is_null(), "user_ptr pointer is null");
let user_ptr = unsafe { &mut *(user_ptr as *mut TestNetworkListenr) };
user_ptr.callback_called();
println!("Dummy callback invoked");
}
struct TestNetworkListenr {
dummy_callback_called: bool,
}
impl TestNetworkListenr {
fn new() -> Self {
Self { dummy_callback_called: false }
}
fn callback_called(&mut self) {
self.dummy_callback_called = true;
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_network_listener() {
println!("Setting up Pub/Sub emulator for network listener test");
let (_container, _host) = setup_pubsub_emulator().await.unwrap();
let mut tester = TestNetworkListenr::new();
let listener = NetworkListener::new(
"testctl",
Duration::from_secs(1),
dummy_callback,
&mut tester as *mut TestNetworkListenr as *mut c_void,
)
.await
.unwrap();
let rt = tokio::runtime::Handle::current();
let run = Arc::new(AtomicBool::new(true));
rt.spawn({
let run = run.clone();
let l = listener.clone();
async move {
while run.load(Ordering::Relaxed) {
match l.listen().await {
Ok(_) => {
println!("Listener exited successfully");
}
Err(e) => {
println!("Failed to start listener: {}", e);
assert!(false, "Listener failed to start");
}
}
}
}
});
rt.spawn({
let run = run.clone();
let l = listener.clone();
async move {
while run.load(Ordering::Relaxed) {
match l.change_handler().await {
Ok(_) => {
println!("Change handler started successfully");
}
Err(e) => {
println!("Failed to start change handler: {}", e);
assert!(false, "Change handler failed to start");
}
}
}
}
});
rt.spawn({
async move {
let client = Client::new(ClientConfig::default()).await.unwrap();
let topic = client.topic("controller-network-change-stream");
if !topic.exists(None).await.unwrap() {
topic.create(None, None).await.unwrap();
}
let mut publisher = topic.new_publisher(None);
let nc = NetworkChange {
old: Some(Network {
network_id: "test_network".to_string(),
name: Some("Test Network".to_string()),
..Default::default()
}),
new: Some(Network {
network_id: "test_network".to_string(),
name: Some("Test Network Updated".to_string()),
..Default::default()
}),
..Default::default()
};
let data = NetworkChange::encode_to_vec(&nc);
let message = PubsubMessage {
data: data.into(),
attributes: HashMap::from([("controller_id".to_string(), "testctl".to_string())]),
ordering_key: format!("networks-{}", "testctl"),
..Default::default()
};
let awaiter = publisher.publish(message).await;
match awaiter.get().await {
Ok(_) => println!("Message published successfully"),
Err(e) => {
assert!(false, "Failed to publish message: {}", e);
eprintln!("Failed to publish message: {}", e)
}
}
publisher.shutdown().await;
}
});
let mut counter = 0;
while !tester.dummy_callback_called && counter < 100 {
tokio::time::sleep(Duration::from_millis(100)).await;
counter += 1;
}
run.store(false, Ordering::Relaxed);
assert!(tester.dummy_callback_called, "Callback was not called");
}
}

View file

@ -1,3 +0,0 @@
pub(crate) mod pbmessages {
include!(concat!(env!("OUT_DIR"), "/pbmessages.rs"));
}