diff --git a/controller/PubSubListener.cpp b/controller/PubSubListener.cpp index 36eb2b14f..72905a77a 100644 --- a/controller/PubSubListener.cpp +++ b/controller/PubSubListener.cpp @@ -58,7 +58,6 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s PubSubListener::~PubSubListener() { _run = false; - _session.cancel(); if (_subscriberThread.joinable()) { _subscriberThread.join(); } @@ -67,24 +66,40 @@ PubSubListener::~PubSubListener() void PubSubListener::subscribe() { while (_run) { - _session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) { - auto provider = opentelemetry::trace::Provider::GetTracerProvider(); - auto tracer = provider->GetTracer("PubSubListener"); - auto span = tracer->StartSpan("PubSubListener::onMessage"); - auto scope = tracer->WithActiveSpan(span); - span->SetAttribute("message_id", m.message_id()); - span->SetAttribute("ordering_key", m.ordering_key()); - span->SetAttribute("attributes", m.attributes().size()); + try { + fprintf(stderr, "Starting new subscription session\n"); + auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) { + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + auto tracer = provider->GetTracer("PubSubListener"); + auto span = tracer->StartSpan("PubSubListener::onMessage"); + auto scope = tracer->WithActiveSpan(span); + span->SetAttribute("message_id", m.message_id()); + span->SetAttribute("ordering_key", m.ordering_key()); + span->SetAttribute("attributes", m.attributes().size()); - fprintf(stderr, "Received message %s\n", m.message_id().c_str()); - onNotification(m.data()); - std::move(h).ack(); - span->SetStatus(opentelemetry::trace::StatusCode::kOk); - return true; - }); - auto status = _session.get(); - if (! status.ok() && _run) { - fprintf(stderr, "Error during Subscribe: %s\n", status.message().c_str()); + fprintf(stderr, "Received message %s\n", m.message_id().c_str()); + onNotification(m.data()); + std::move(h).ack(); + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + return true; + }); + + auto result = session.wait_for(std::chrono::seconds(10)); + if (result == std::future_status::timeout) { + session.cancel(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + continue; + } + + if (! session.valid()) { + fprintf(stderr, "Subscription session no longer valid\n"); + std::this_thread::sleep_for(std::chrono::seconds(5)); + continue; + } + } + catch (google::cloud::Status const& status) { + fprintf(stderr, "Subscription terminated with status: %s\n", status.message().c_str()); + std::this_thread::sleep_for(std::chrono::seconds(5)); } } } diff --git a/controller/PubSubListener.hpp b/controller/PubSubListener.hpp index 10d78ea94..b8f08aeae 100644 --- a/controller/PubSubListener.hpp +++ b/controller/PubSubListener.hpp @@ -44,7 +44,6 @@ class PubSubListener : public NotificationListener { google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient; google::cloud::pubsub::Subscription _subscription; std::shared_ptr _subscriber; - google::cloud::future _session; std::thread _subscriberThread; };