From 6196e87303e1f8c75e31a4f162932acb5ca60781 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Mon, 22 Sep 2025 16:32:28 -0700 Subject: [PATCH] only create the subscription if pubsub emulator is being used. --- nonfree/controller/CtlUtil.cpp | 33 +++++++++++++++++++++++++++ nonfree/controller/CtlUtil.hpp | 6 +++++ nonfree/controller/PubSubListener.cpp | 25 +++----------------- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/nonfree/controller/CtlUtil.cpp b/nonfree/controller/CtlUtil.cpp index 36aad01b0..8e6452f33 100644 --- a/nonfree/controller/CtlUtil.cpp +++ b/nonfree/controller/CtlUtil.cpp @@ -119,6 +119,39 @@ void create_gcp_pubsub_topic_if_needed(std::string project_id, std::string topic } } +void create_gcp_pubsub_subscription_if_needed( + std::string project_id, + std::string subscription_id, + std::string topic_id, + std::string controller_id) +{ + // This is a no-op if the subscription already exists. + auto subscriptionAdminClient = + pubsub_admin::SubscriptionAdminClient(pubsub_admin::MakeSubscriptionAdminConnection()); + auto topicName = pubsub::Topic(project_id, topic_id).FullName(); + auto subscriptionName = pubsub::Subscription(project_id, subscription_id).FullName(); + + auto sub = subscriptionAdminClient.GetSubscription(subscriptionName); + if (! sub.ok()) { + if (sub.status().code() == google::cloud::StatusCode::kNotFound) { + google::pubsub::v1::Subscription request; + request.set_name(subscription_id); + request.set_topic(pubsub::Topic(project_id, topic_id).FullName()); + request.set_filter("(attributes.controller_id=\"" + controller_id + "\")"); + auto createResult = subscriptionAdminClient.CreateSubscription(request); + if (! createResult.ok()) { + fprintf(stderr, "Failed to create subscription: %s\n", createResult.status().message().c_str()); + throw std::runtime_error("Failed to create subscription"); + } + fprintf(stderr, "Created subscription: %s\n", subscriptionName.c_str()); + } + else { + fprintf(stderr, "Failed to get subscription: %s\n", sub.status().message().c_str()); + throw std::runtime_error("Failed to get subscription"); + } + } +} + // void create_bigtable_table(std::string project_id, std::string instance_id) // { // auto bigtableAdminClient = diff --git a/nonfree/controller/CtlUtil.hpp b/nonfree/controller/CtlUtil.hpp index 46faed2b7..c10322012 100644 --- a/nonfree/controller/CtlUtil.hpp +++ b/nonfree/controller/CtlUtil.hpp @@ -20,6 +20,12 @@ std::string random_hex_string(std::size_t length); #ifdef ZT1_CENTRAL_CONTROLLER void create_gcp_pubsub_topic_if_needed(std::string project_id, std::string topic_id); + +void create_gcp_pubsub_subscription_if_needed( + std::string project_id, + std::string subscription_id, + std::string topic_id, + std::string controller_id); #endif } // namespace ZeroTier diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index b1a7187f6..8e727e8d5 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -35,33 +35,19 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection()) , _subscription(nullptr) { + GOOGLE_PROTOBUF_VERIFY_VERSION; + _subscription_id = "sub-" + controller_id + "-" + topic; // + "-" + random_hex_string(8); _subscription = new pubsub::Subscription(_project, _subscription_id); fprintf( stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(), project.c_str(), topic.c_str(), _subscription_id.c_str()); - GOOGLE_PROTOBUF_VERIFY_VERSION; // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST"); if (emulatorHost != nullptr) { create_gcp_pubsub_topic_if_needed(project, topic); - } - - google::pubsub::v1::Subscription request; - request.set_name(_subscription->FullName()); - request.set_topic(pubsub::Topic(project, topic).FullName()); - request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")"); - auto sub = _adminClient.CreateSubscription(request); - if (! sub.ok()) { - if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) { - fprintf(stderr, "Subscription already exists. Attaching to it\n"); - // throw std::runtime_error("Subscription already exists"); - } - else { - fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str()); - throw std::runtime_error("Failed to create subscription"); - } + create_gcp_pubsub_subscription_if_needed(_project, _subscription_id, _topic, _controller_id); } _subscriber = std::make_shared(pubsub::MakeSubscriberConnection(*_subscription)); @@ -77,11 +63,6 @@ PubSubListener::~PubSubListener() _subscriberThread.join(); } - auto status = _adminClient.DeleteSubscription(_subscription->FullName()); - if (! status.ok()) { - fprintf(stderr, "Failed to delete subscription: %s\n", status.message().c_str()); - } - if (_subscription) { delete _subscription; _subscription = nullptr;