From 6113bad61e60abf7728b8b04dc6481423c41624e Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Thu, 18 Sep 2025 09:33:04 -0700 Subject: [PATCH] make pubsub topics configurable Also for pubsub trips back to CV1/CV2, rather than having 2 queues for each service (networks & members), theres now only a single queue for each change type, and `frontend = (cv1|cv2)` attribute is set on the message for filtering. --- ext/central-controller-docker/main-new.sh | 11 ++++++- nonfree/controller/CentralDB.cpp | 12 ++++--- .../controller/ControllerChangeNotifier.cpp | 32 ++++++------------- .../controller/ControllerChangeNotifier.hpp | 13 ++++---- nonfree/controller/ControllerConfig.hpp | 4 +++ nonfree/controller/PubSubListener.cpp | 8 ++--- nonfree/controller/PubSubListener.hpp | 4 +-- nonfree/controller/PubSubWriter.cpp | 22 +++++++++---- nonfree/controller/PubSubWriter.hpp | 10 ++++-- service/OneService.cpp | 8 +++++ 10 files changed, 74 insertions(+), 50 deletions(-) diff --git a/ext/central-controller-docker/main-new.sh b/ext/central-controller-docker/main-new.sh index aa0dba0fa..b7085997a 100755 --- a/ext/central-controller-docker/main-new.sh +++ b/ext/central-controller-docker/main-new.sh @@ -95,8 +95,17 @@ if [ "$ZT_USE_PUBSUB" == "true" ]; then exit 1 fi + if [ -z "$ZT_PUBSUB_MEMBER_CHANGE_RECV_TOPIC" ] || [ -z "$ZT_PUBSUB_MEMBER_CHANGE_SEND_TOPIC" ] || [ -z "$ZT_PUBSUB_NETWORK_CHANGE_RECV_TOPIC" ] || [ -z "$ZT_PUBSUB_NETWORK_CHANGE_SEND_TOPIC" ]; then + echo '*** FAILED: ZT_PUBSUB_MEMBER_CHANGE_RECV_TOPIC, ZT_PUBSUB_MEMBER_CHANGE_SEND_TOPIC, ZT_PUBSUB_NETWORK_CHANGE_RECV_TOPIC, and ZT_PUBSUB_NETWORK_CHANGE_SEND_TOPIC environment variables must all be defined to use PubSub as a controller backend' + exit 1 + fi + PUBSUB_CONF=", \"pubsub\": { - \"project_id\": \"${ZT_PUBSUB_PROJECT}\" + \"project_id\": \"${ZT_PUBSUB_PROJECT}\", + \"member_change_recv_topic\": \"${ZT_PUBSUB_MEMBER_CHANGE_RECV_TOPIC}\", + \"member_change_send_topic\": \"${ZT_PUBSUB_MEMBER_CHANGE_SEND_TOPIC}\", + \"network_change_recv_topic\": \"${ZT_PUBSUB_NETWORK_CHANGE_RECV_TOPIC}\", + \"network_change_send_topic\": \"${ZT_PUBSUB_NETWORK_CHANGE_SEND_TOPIC}\" } " fi diff --git a/nonfree/controller/CentralDB.cpp b/nonfree/controller/CentralDB.cpp index 9dc103d25..5d85de441 100644 --- a/nonfree/controller/CentralDB.cpp +++ b/nonfree/controller/CentralDB.cpp @@ -165,11 +165,13 @@ CentralDB::CentralDB( case LISTENER_MODE_PUBSUB: fprintf(stderr, "Using PubSub for change listeners\n"); if (cc->pubSubConfig != NULL) { - _membersDbWatcher = - std::make_shared(_myAddressStr, cc->pubSubConfig->project_id, this); - _networksDbWatcher = - std::make_shared(_myAddressStr, cc->pubSubConfig->project_id, this); - _changeNotifier = std::make_shared(_myAddressStr, cc->pubSubConfig->project_id); + _membersDbWatcher = std::make_shared( + _myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->member_change_recv_topic, this); + _networksDbWatcher = std::make_shared( + _myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->network_change_recv_topic, this); + _changeNotifier = std::make_shared( + _myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->member_change_send_topic, + cc->pubSubConfig->network_change_send_topic); } else { throw std::runtime_error( diff --git a/nonfree/controller/ControllerChangeNotifier.cpp b/nonfree/controller/ControllerChangeNotifier.cpp index 74d52754f..2f1bf67fb 100644 --- a/nonfree/controller/ControllerChangeNotifier.cpp +++ b/nonfree/controller/ControllerChangeNotifier.cpp @@ -4,12 +4,14 @@ namespace ZeroTier { -PubSubChangeNotifier::PubSubChangeNotifier(std::string controllerID, std::string project) +PubSubChangeNotifier::PubSubChangeNotifier( + std::string controllerID, + std::string project, + std::string memberChangeTopic, + std::string networkChangeTopic) : ControllerChangeNotifier() - , _cv1networkChangeWriter(std::make_shared(project, "ctl-to-cv1-network-change-stream", controllerID)) - , _cv1memberChangeWriter(std::make_shared(project, "ctl-to-cv1-member-change-stream", controllerID)) - , _cv2networkChangeWriter(std::make_shared(project, "ctl-to-cv2-network-change-stream", controllerID)) - , _cv2memberChangeWriter(std::make_shared(project, "ctl-to-cv2-member-change-stream", controllerID)) + , _networkChangeWriter(std::make_shared(project, networkChangeTopic, controllerID)) + , _memberChangeWriter(std::make_shared(project, memberChangeTopic, controllerID)) { } @@ -22,15 +24,7 @@ void PubSubChangeNotifier::notifyNetworkChange( const nlohmann::json& newNetwork, const std::string& frontend) { - if (frontend == "cv1") { - _cv1networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork); - } - else if (frontend == "cv2") { - _cv2networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork); - } - else { - throw std::runtime_error("Unknown frontend: " + frontend); - } + _networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork, frontend); } void PubSubChangeNotifier::notifyMemberChange( @@ -38,15 +32,7 @@ void PubSubChangeNotifier::notifyMemberChange( const nlohmann::json newMember, const std::string& frontend) { - if (frontend == "cv1") { - _cv1memberChangeWriter->publishMemberChange(oldMember, newMember); - } - else if (frontend == "cv2") { - _cv2memberChangeWriter->publishMemberChange(oldMember, newMember); - } - else { - throw std::runtime_error("Unknown frontend: " + frontend); - } + _memberChangeWriter->publishMemberChange(oldMember, newMember, frontend); } } // namespace ZeroTier \ No newline at end of file diff --git a/nonfree/controller/ControllerChangeNotifier.hpp b/nonfree/controller/ControllerChangeNotifier.hpp index ba8b210e3..2a5e000ff 100644 --- a/nonfree/controller/ControllerChangeNotifier.hpp +++ b/nonfree/controller/ControllerChangeNotifier.hpp @@ -26,7 +26,11 @@ class ControllerChangeNotifier { class PubSubChangeNotifier : public ControllerChangeNotifier { public: - PubSubChangeNotifier(std::string controllerID, std::string project); + PubSubChangeNotifier( + std::string controllerID, + std::string project, + std::string memberChangeTopic, + std::string networkChangeTopic); virtual ~PubSubChangeNotifier(); virtual void notifyNetworkChange( @@ -40,11 +44,8 @@ class PubSubChangeNotifier : public ControllerChangeNotifier { const std::string& frontend = "") override; private: - std::shared_ptr _cv1networkChangeWriter; - std::shared_ptr _cv1memberChangeWriter; - - std::shared_ptr _cv2networkChangeWriter; - std::shared_ptr _cv2memberChangeWriter; + std::shared_ptr _networkChangeWriter; + std::shared_ptr _memberChangeWriter; }; } // namespace ZeroTier diff --git a/nonfree/controller/ControllerConfig.hpp b/nonfree/controller/ControllerConfig.hpp index b082b793b..2df1736c7 100644 --- a/nonfree/controller/ControllerConfig.hpp +++ b/nonfree/controller/ControllerConfig.hpp @@ -9,6 +9,10 @@ namespace ZeroTier { struct PubSubConfig { std::string project_id; + std::string member_change_recv_topic; + std::string member_change_send_topic; + std::string network_change_recv_topic; + std::string network_change_send_topic; }; struct BigTableConfig { diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index c4e7fb58d..0d87ff47c 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -121,8 +121,8 @@ void PubSubListener::subscribe() } } -PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, DB* db) - : PubSubListener(controller_id, project, "controller-network-change-stream") +PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, std::string topic, DB* db) + : PubSubListener(controller_id, project, topic) , _db(db) { } @@ -199,8 +199,8 @@ void PubSubNetworkListener::onNotification(const std::string& payload) } } -PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, DB* db) - : PubSubListener(controller_id, project, "controller-member-change-stream") +PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db) + : PubSubListener(controller_id, project, topic) , _db(db) { } diff --git a/nonfree/controller/PubSubListener.hpp b/nonfree/controller/PubSubListener.hpp index 5c6aa712c..6e7f17c64 100644 --- a/nonfree/controller/PubSubListener.hpp +++ b/nonfree/controller/PubSubListener.hpp @@ -45,7 +45,7 @@ class PubSubListener : public NotificationListener { */ class PubSubNetworkListener : public PubSubListener { public: - PubSubNetworkListener(std::string controller_id, std::string project, DB* db); + PubSubNetworkListener(std::string controller_id, std::string project, std::string topic, DB* db); virtual ~PubSubNetworkListener(); virtual void onNotification(const std::string& payload) override; @@ -59,7 +59,7 @@ class PubSubNetworkListener : public PubSubListener { */ class PubSubMemberListener : public PubSubListener { public: - PubSubMemberListener(std::string controller_id, std::string project, DB* db); + PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db); virtual ~PubSubMemberListener(); virtual void onNotification(const std::string& payload) override; diff --git a/nonfree/controller/PubSubWriter.cpp b/nonfree/controller/PubSubWriter.cpp index c1bd7d568..fcba2c2fa 100644 --- a/nonfree/controller/PubSubWriter.cpp +++ b/nonfree/controller/PubSubWriter.cpp @@ -51,11 +51,15 @@ PubSubWriter::~PubSubWriter() { } -bool PubSubWriter::publishMessage(const std::string& payload) +bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend) { std::vector > attributes; attributes.emplace_back("controller_id", _controller_id); + if (! frontend.empty()) { + attributes.emplace_back("frontend", frontend); + } + auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build(); auto message_id = _publisher->Publish(std::move(msg)).get(); if (! message_id) { @@ -67,7 +71,10 @@ bool PubSubWriter::publishMessage(const std::string& payload) return true; } -bool PubSubWriter::publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork) +bool PubSubWriter::publishNetworkChange( + const nlohmann::json& oldNetwork, + const nlohmann::json& newNetwork, + const std::string& frontend) { pbmessages::NetworkChange nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork); std::string payload; @@ -76,10 +83,13 @@ bool PubSubWriter::publishNetworkChange(const nlohmann::json& oldNetwork, const return false; } - return publishMessage(payload); + return publishMessage(payload, frontend); } -bool PubSubWriter::publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember) +bool PubSubWriter::publishMemberChange( + const nlohmann::json& oldMember, + const nlohmann::json& newMember, + const std::string& frontend) { pbmessages::MemberChange mc = memberChangeFromJson(_controller_id, oldMember, newMember); std::string payload; @@ -88,7 +98,7 @@ bool PubSubWriter::publishMemberChange(const nlohmann::json& oldMember, const nl return false; } - return publishMessage(payload); + return publishMessage(payload, frontend); } bool PubSubWriter::publishStatusChange( @@ -124,7 +134,7 @@ bool PubSubWriter::publishStatusChange( return false; } - return publishMessage(payload); + return publishMessage(payload, ""); } pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j) diff --git a/nonfree/controller/PubSubWriter.hpp b/nonfree/controller/PubSubWriter.hpp index 26a7bc9ee..bdc010393 100644 --- a/nonfree/controller/PubSubWriter.hpp +++ b/nonfree/controller/PubSubWriter.hpp @@ -13,9 +13,13 @@ class PubSubWriter { PubSubWriter(std::string project, std::string topic, std::string controller_id); virtual ~PubSubWriter(); - bool publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork); + bool publishNetworkChange( + const nlohmann::json& oldNetwork, + const nlohmann::json& newNetwork, + const std::string& frontend); - bool publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember); + bool + publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember, const std::string& frontend); bool publishStatusChange( std::string frontend, @@ -27,7 +31,7 @@ class PubSubWriter { int64_t last_seen); protected: - bool publishMessage(const std::string& payload); + bool publishMessage(const std::string& payload, const std::string& frontend); private: std::string _controller_id; diff --git a/service/OneService.cpp b/service/OneService.cpp index a097156b7..128eb46d2 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -1822,6 +1822,14 @@ class OneServiceImpl : public OneService { json& ps = cc["pubsub"]; _controllerConfig.pubSubConfig = new PubSubConfig(); _controllerConfig.pubSubConfig->project_id = OSUtils::jsonString(ps["project_id"], ""); + _controllerConfig.pubSubConfig->member_change_recv_topic = + OSUtils::jsonString(ps["member_change_recv_topic"], ""); + _controllerConfig.pubSubConfig->member_change_send_topic = + OSUtils::jsonString(ps["member_change_send_topic"], ""); + _controllerConfig.pubSubConfig->network_change_recv_topic = + OSUtils::jsonString(ps["network_change_recv_topic"], ""); + _controllerConfig.pubSubConfig->network_change_send_topic = + OSUtils::jsonString(ps["network_change_send_topic"], ""); } if (_controllerConfig.listenMode == "pubsub" && ! _controllerConfig.pubSubConfig) { fprintf(stderr, "ERROR: pubsub listenMode requires pubsub configuration in local.conf" ZT_EOL_S);