From d96d3b27d22b27079728a84862fdc7531aaf5ba3 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Mon, 22 Sep 2025 10:52:06 -0700 Subject: [PATCH] try making the controller attach to an existing pubsub subscription if it already exists --- nonfree/controller/PubSubListener.cpp | 33 ++++++++++++++++----------- nonfree/controller/PubSubListener.hpp | 2 +- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index 0d87ff47c..b1a7187f6 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -30,12 +30,13 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s : _controller_id(controller_id) , _project(project) , _topic(topic) - , _subscription_id("sub-" + controller_id + "-" + topic) + , _subscription_id() , _run(false) , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection()) - , _subscription(pubsub::Subscription(_project, _subscription_id)) + , _subscription(nullptr) { - _subscription_id = _subscription_id + "-" + random_hex_string(8); + _subscription_id = "sub-" + controller_id + "-" + topic; // + "-" + random_hex_string(8); + _subscription = new pubsub::Subscription(_project, _subscription_id); fprintf( stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(), project.c_str(), topic.c_str(), _subscription_id.c_str()); @@ -48,21 +49,22 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s } google::pubsub::v1::Subscription request; - request.set_name(_subscription.FullName()); + request.set_name(_subscription->FullName()); request.set_topic(pubsub::Topic(project, topic).FullName()); request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")"); auto sub = _adminClient.CreateSubscription(request); if (! sub.ok()) { - fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str()); - throw std::runtime_error("Failed to create subscription"); + if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) { + fprintf(stderr, "Subscription already exists. Attaching to it\n"); + // throw std::runtime_error("Subscription already exists"); + } + else { + fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str()); + throw std::runtime_error("Failed to create subscription"); + } } - if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) { - fprintf(stderr, "Subscription already exists\n"); - throw std::runtime_error("Subscription already exists"); - } - - _subscriber = std::make_shared(pubsub::MakeSubscriberConnection(_subscription)); + _subscriber = std::make_shared(pubsub::MakeSubscriberConnection(*_subscription)); _run = true; _subscriberThread = std::thread(&PubSubListener::subscribe, this); @@ -75,10 +77,15 @@ PubSubListener::~PubSubListener() _subscriberThread.join(); } - auto status = _adminClient.DeleteSubscription(_subscription.FullName()); + auto status = _adminClient.DeleteSubscription(_subscription->FullName()); if (! status.ok()) { fprintf(stderr, "Failed to delete subscription: %s\n", status.message().c_str()); } + + if (_subscription) { + delete _subscription; + _subscription = nullptr; + } } void PubSubListener::subscribe() diff --git a/nonfree/controller/PubSubListener.hpp b/nonfree/controller/PubSubListener.hpp index 6e7f17c64..2efe8c819 100644 --- a/nonfree/controller/PubSubListener.hpp +++ b/nonfree/controller/PubSubListener.hpp @@ -35,7 +35,7 @@ class PubSubListener : public NotificationListener { void subscribe(); bool _run = false; google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient; - google::cloud::pubsub::Subscription _subscription; + google::cloud::pubsub::Subscription* _subscription; std::shared_ptr _subscriber; std::thread _subscriberThread; };