From 4fab227c2380c0a2217fda7151e43e3979d60f18 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Thu, 14 Aug 2025 14:20:34 -0700 Subject: [PATCH] fix calling into async functions from non-async via the FFI --- rustybits/Cargo.lock | 1 + rustybits/Cargo.toml | 4 +++- rustybits/src/ext.rs | 20 ++++++++++++++------ 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/rustybits/Cargo.lock b/rustybits/Cargo.lock index fd6b37a34..c037e3695 100644 --- a/rustybits/Cargo.lock +++ b/rustybits/Cargo.lock @@ -3101,6 +3101,7 @@ dependencies = [ "base64 0.21.7", "bytes", "cbindgen", + "futures", "gcloud-gax", "gcloud-googleapis", "gcloud-pubsub", diff --git a/rustybits/Cargo.toml b/rustybits/Cargo.toml index 61888ab36..a84bcc0ea 100644 --- a/rustybits/Cargo.toml +++ b/rustybits/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" crate-type = ["staticlib", "rlib"] [features] -default = ["zeroidc"] +default = ["zeroidc", "ztcontroller"] zeroidc = [] ztcontroller = [ "dep:serde", @@ -22,6 +22,7 @@ ztcontroller = [ "dep:gcloud-googleapis", "dep:tokio", "dep:tokio-util", + "dep:futures", ] [dependencies] @@ -56,6 +57,7 @@ prost = { version = "0.14", optional = true, features = ["derive"] } prost-types = { version = "0.14", optional = true } gcloud-gax = { version = "1.2.0", optional = true } gcloud-googleapis = { version = "1.2.0", optional = true } +futures = { version = "0.3", optional = true } [dev-dependencies] testcontainers = { version = "0.24", features = ["blocking"] } diff --git a/rustybits/src/ext.rs b/rustybits/src/ext.rs index 50962d785..1a21301cc 100644 --- a/rustybits/src/ext.rs +++ b/rustybits/src/ext.rs @@ -530,7 +530,8 @@ pub unsafe extern "C" fn network_listener_new( let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap(); let rt = runtime::Handle::current(); - rt.block_on(async { + let _block = rt.enter(); + futures::executor::block_on(async { match NetworkListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await { Ok(listener) => Arc::into_raw(listener), Err(e) => { @@ -562,7 +563,9 @@ pub unsafe extern "C" fn network_listener_listen(ptr: *const NetworkListener) -> let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); let rt = runtime::Handle::current(); - match rt.block_on(listener.listen()) { + let _guard = rt.enter(); + + match futures::executor::block_on(listener.listen()) { Ok(_) => { println!("Network listener started successfully"); true @@ -586,7 +589,9 @@ pub unsafe extern "C" fn network_listener_change_handler(ptr: *const NetworkList let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); let rt = runtime::Handle::current(); - match rt.block_on(listener.change_handler()) { + let _guard = rt.enter(); + + match futures::executor::block_on(listener.change_handler()) { Ok(_) => { println!("Network listener change listener completed successfully"); } @@ -616,7 +621,8 @@ pub unsafe extern "C" fn member_listener_new( let id = unsafe { CStr::from_ptr(controller_id) }.to_str().unwrap(); let rt = runtime::Handle::current(); - rt.block_on(async { + let _block = rt.enter(); + futures::executor::block_on(async { match MemberListener::new(id, Duration::from_secs(listen_timeout), callback, user_ptr).await { Ok(listener) => Arc::into_raw(listener), Err(e) => { @@ -647,7 +653,8 @@ pub unsafe extern "C" fn member_listener_listen(ptr: *const MemberListener) -> b let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); let rt = runtime::Handle::current(); - match rt.block_on(listener.listen()) { + let _guard = rt.enter(); + match futures::executor::block_on(listener.listen()) { Ok(_) => { println!("Member listener started successfully"); true @@ -671,7 +678,8 @@ pub unsafe extern "C" fn member_listener_change_handler(ptr: *const MemberListen let listener = ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }); let rt = runtime::Handle::current(); - match rt.block_on(listener.change_handler()) { + let _guard = rt.enter(); + match futures::executor::block_on(listener.change_handler()) { Ok(_) => { println!("Member listener change listener completed successfully"); }