only create the subscription if pubsub emulator is being used.
Some checks failed
/ build_macos (push) Has been cancelled
/ build_windows (push) Has been cancelled
/ Central Controller Build (push) Has been cancelled
/ build_ubuntu (push) Has been cancelled
/ multi-arch-docker (push) Has been cancelled

This commit is contained in:
Grant Limberg 2025-09-22 16:32:28 -07:00
parent d96d3b27d2
commit 6196e87303
3 changed files with 42 additions and 22 deletions

View file

@ -119,6 +119,39 @@ void create_gcp_pubsub_topic_if_needed(std::string project_id, std::string topic
}
}
void create_gcp_pubsub_subscription_if_needed(
std::string project_id,
std::string subscription_id,
std::string topic_id,
std::string controller_id)
{
// This is a no-op if the subscription already exists.
auto subscriptionAdminClient =
pubsub_admin::SubscriptionAdminClient(pubsub_admin::MakeSubscriptionAdminConnection());
auto topicName = pubsub::Topic(project_id, topic_id).FullName();
auto subscriptionName = pubsub::Subscription(project_id, subscription_id).FullName();
auto sub = subscriptionAdminClient.GetSubscription(subscriptionName);
if (! sub.ok()) {
if (sub.status().code() == google::cloud::StatusCode::kNotFound) {
google::pubsub::v1::Subscription request;
request.set_name(subscription_id);
request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
request.set_filter("(attributes.controller_id=\"" + controller_id + "\")");
auto createResult = subscriptionAdminClient.CreateSubscription(request);
if (! createResult.ok()) {
fprintf(stderr, "Failed to create subscription: %s\n", createResult.status().message().c_str());
throw std::runtime_error("Failed to create subscription");
}
fprintf(stderr, "Created subscription: %s\n", subscriptionName.c_str());
}
else {
fprintf(stderr, "Failed to get subscription: %s\n", sub.status().message().c_str());
throw std::runtime_error("Failed to get subscription");
}
}
}
// void create_bigtable_table(std::string project_id, std::string instance_id)
// {
// auto bigtableAdminClient =

View file

@ -20,6 +20,12 @@ std::string random_hex_string(std::size_t length);
#ifdef ZT1_CENTRAL_CONTROLLER
void create_gcp_pubsub_topic_if_needed(std::string project_id, std::string topic_id);
void create_gcp_pubsub_subscription_if_needed(
std::string project_id,
std::string subscription_id,
std::string topic_id,
std::string controller_id);
#endif
} // namespace ZeroTier

View file

@ -35,33 +35,19 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s
, _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
, _subscription(nullptr)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
_subscription_id = "sub-" + controller_id + "-" + topic; // + "-" + random_hex_string(8);
_subscription = new pubsub::Subscription(_project, _subscription_id);
fprintf(
stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(),
project.c_str(), topic.c_str(), _subscription_id.c_str());
GOOGLE_PROTOBUF_VERIFY_VERSION;
// If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
if (emulatorHost != nullptr) {
create_gcp_pubsub_topic_if_needed(project, topic);
}
google::pubsub::v1::Subscription request;
request.set_name(_subscription->FullName());
request.set_topic(pubsub::Topic(project, topic).FullName());
request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")");
auto sub = _adminClient.CreateSubscription(request);
if (! sub.ok()) {
if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
fprintf(stderr, "Subscription already exists. Attaching to it\n");
// throw std::runtime_error("Subscription already exists");
}
else {
fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str());
throw std::runtime_error("Failed to create subscription");
}
create_gcp_pubsub_subscription_if_needed(_project, _subscription_id, _topic, _controller_id);
}
_subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(*_subscription));
@ -77,11 +63,6 @@ PubSubListener::~PubSubListener()
_subscriberThread.join();
}
auto status = _adminClient.DeleteSubscription(_subscription->FullName());
if (! status.ok()) {
fprintf(stderr, "Failed to delete subscription: %s\n", status.message().c_str());
}
if (_subscription) {
delete _subscription;
_subscription = nullptr;