try making the controller attach to an existing pubsub subscription if it already exists
Some checks are pending
/ build_macos (push) Waiting to run
/ build_windows (push) Waiting to run
/ Central Controller Build (push) Waiting to run
/ multi-arch-docker (push) Blocked by required conditions
/ build_ubuntu (push) Waiting to run

This commit is contained in:
Grant Limberg 2025-09-22 10:52:06 -07:00
parent 6113bad61e
commit d96d3b27d2
2 changed files with 21 additions and 14 deletions

View file

@ -30,12 +30,13 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s
: _controller_id(controller_id)
, _project(project)
, _topic(topic)
, _subscription_id("sub-" + controller_id + "-" + topic)
, _subscription_id()
, _run(false)
, _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
, _subscription(pubsub::Subscription(_project, _subscription_id))
, _subscription(nullptr)
{
_subscription_id = _subscription_id + "-" + random_hex_string(8);
_subscription_id = "sub-" + controller_id + "-" + topic; // + "-" + random_hex_string(8);
_subscription = new pubsub::Subscription(_project, _subscription_id);
fprintf(
stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(),
project.c_str(), topic.c_str(), _subscription_id.c_str());
@ -48,21 +49,22 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s
}
google::pubsub::v1::Subscription request;
request.set_name(_subscription.FullName());
request.set_name(_subscription->FullName());
request.set_topic(pubsub::Topic(project, topic).FullName());
request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")");
auto sub = _adminClient.CreateSubscription(request);
if (! sub.ok()) {
if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
fprintf(stderr, "Subscription already exists. Attaching to it\n");
// throw std::runtime_error("Subscription already exists");
}
else {
fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str());
throw std::runtime_error("Failed to create subscription");
}
if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
fprintf(stderr, "Subscription already exists\n");
throw std::runtime_error("Subscription already exists");
}
_subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(_subscription));
_subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(*_subscription));
_run = true;
_subscriberThread = std::thread(&PubSubListener::subscribe, this);
@ -75,10 +77,15 @@ PubSubListener::~PubSubListener()
_subscriberThread.join();
}
auto status = _adminClient.DeleteSubscription(_subscription.FullName());
auto status = _adminClient.DeleteSubscription(_subscription->FullName());
if (! status.ok()) {
fprintf(stderr, "Failed to delete subscription: %s\n", status.message().c_str());
}
if (_subscription) {
delete _subscription;
_subscription = nullptr;
}
}
void PubSubListener::subscribe()

View file

@ -35,7 +35,7 @@ class PubSubListener : public NotificationListener {
void subscribe();
bool _run = false;
google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient;
google::cloud::pubsub::Subscription _subscription;
google::cloud::pubsub::Subscription* _subscription;
std::shared_ptr<google::cloud::pubsub::Subscriber> _subscriber;
std::thread _subscriberThread;
};