make pubsub topics configurable
Some checks failed
/ build_macos (push) Has been cancelled
/ build_windows (push) Has been cancelled
/ Central Controller Build (push) Has been cancelled
/ build_ubuntu (push) Has been cancelled
/ multi-arch-docker (push) Has been cancelled

Also for pubsub trips back to CV1/CV2, rather than having 2 queues for each service (networks & members), theres now only a single queue for each change type, and `frontend = (cv1|cv2)` attribute is set on the message for filtering.
This commit is contained in:
Grant Limberg 2025-09-18 09:33:04 -07:00
parent cb2de5aae1
commit 6113bad61e
10 changed files with 74 additions and 50 deletions

View file

@ -95,8 +95,17 @@ if [ "$ZT_USE_PUBSUB" == "true" ]; then
exit 1 exit 1
fi fi
if [ -z "$ZT_PUBSUB_MEMBER_CHANGE_RECV_TOPIC" ] || [ -z "$ZT_PUBSUB_MEMBER_CHANGE_SEND_TOPIC" ] || [ -z "$ZT_PUBSUB_NETWORK_CHANGE_RECV_TOPIC" ] || [ -z "$ZT_PUBSUB_NETWORK_CHANGE_SEND_TOPIC" ]; then
echo '*** FAILED: ZT_PUBSUB_MEMBER_CHANGE_RECV_TOPIC, ZT_PUBSUB_MEMBER_CHANGE_SEND_TOPIC, ZT_PUBSUB_NETWORK_CHANGE_RECV_TOPIC, and ZT_PUBSUB_NETWORK_CHANGE_SEND_TOPIC environment variables must all be defined to use PubSub as a controller backend'
exit 1
fi
PUBSUB_CONF=", \"pubsub\": { PUBSUB_CONF=", \"pubsub\": {
\"project_id\": \"${ZT_PUBSUB_PROJECT}\" \"project_id\": \"${ZT_PUBSUB_PROJECT}\",
\"member_change_recv_topic\": \"${ZT_PUBSUB_MEMBER_CHANGE_RECV_TOPIC}\",
\"member_change_send_topic\": \"${ZT_PUBSUB_MEMBER_CHANGE_SEND_TOPIC}\",
\"network_change_recv_topic\": \"${ZT_PUBSUB_NETWORK_CHANGE_RECV_TOPIC}\",
\"network_change_send_topic\": \"${ZT_PUBSUB_NETWORK_CHANGE_SEND_TOPIC}\"
} }
" "
fi fi

View file

@ -165,11 +165,13 @@ CentralDB::CentralDB(
case LISTENER_MODE_PUBSUB: case LISTENER_MODE_PUBSUB:
fprintf(stderr, "Using PubSub for change listeners\n"); fprintf(stderr, "Using PubSub for change listeners\n");
if (cc->pubSubConfig != NULL) { if (cc->pubSubConfig != NULL) {
_membersDbWatcher = _membersDbWatcher = std::make_shared<PubSubMemberListener>(
std::make_shared<PubSubMemberListener>(_myAddressStr, cc->pubSubConfig->project_id, this); _myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->member_change_recv_topic, this);
_networksDbWatcher = _networksDbWatcher = std::make_shared<PubSubNetworkListener>(
std::make_shared<PubSubNetworkListener>(_myAddressStr, cc->pubSubConfig->project_id, this); _myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->network_change_recv_topic, this);
_changeNotifier = std::make_shared<PubSubChangeNotifier>(_myAddressStr, cc->pubSubConfig->project_id); _changeNotifier = std::make_shared<PubSubChangeNotifier>(
_myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->member_change_send_topic,
cc->pubSubConfig->network_change_send_topic);
} }
else { else {
throw std::runtime_error( throw std::runtime_error(

View file

@ -4,12 +4,14 @@
namespace ZeroTier { namespace ZeroTier {
PubSubChangeNotifier::PubSubChangeNotifier(std::string controllerID, std::string project) PubSubChangeNotifier::PubSubChangeNotifier(
std::string controllerID,
std::string project,
std::string memberChangeTopic,
std::string networkChangeTopic)
: ControllerChangeNotifier() : ControllerChangeNotifier()
, _cv1networkChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv1-network-change-stream", controllerID)) , _networkChangeWriter(std::make_shared<PubSubWriter>(project, networkChangeTopic, controllerID))
, _cv1memberChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv1-member-change-stream", controllerID)) , _memberChangeWriter(std::make_shared<PubSubWriter>(project, memberChangeTopic, controllerID))
, _cv2networkChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv2-network-change-stream", controllerID))
, _cv2memberChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv2-member-change-stream", controllerID))
{ {
} }
@ -22,15 +24,7 @@ void PubSubChangeNotifier::notifyNetworkChange(
const nlohmann::json& newNetwork, const nlohmann::json& newNetwork,
const std::string& frontend) const std::string& frontend)
{ {
if (frontend == "cv1") { _networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork, frontend);
_cv1networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork);
}
else if (frontend == "cv2") {
_cv2networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork);
}
else {
throw std::runtime_error("Unknown frontend: " + frontend);
}
} }
void PubSubChangeNotifier::notifyMemberChange( void PubSubChangeNotifier::notifyMemberChange(
@ -38,15 +32,7 @@ void PubSubChangeNotifier::notifyMemberChange(
const nlohmann::json newMember, const nlohmann::json newMember,
const std::string& frontend) const std::string& frontend)
{ {
if (frontend == "cv1") { _memberChangeWriter->publishMemberChange(oldMember, newMember, frontend);
_cv1memberChangeWriter->publishMemberChange(oldMember, newMember);
}
else if (frontend == "cv2") {
_cv2memberChangeWriter->publishMemberChange(oldMember, newMember);
}
else {
throw std::runtime_error("Unknown frontend: " + frontend);
}
} }
} // namespace ZeroTier } // namespace ZeroTier

View file

@ -26,7 +26,11 @@ class ControllerChangeNotifier {
class PubSubChangeNotifier : public ControllerChangeNotifier { class PubSubChangeNotifier : public ControllerChangeNotifier {
public: public:
PubSubChangeNotifier(std::string controllerID, std::string project); PubSubChangeNotifier(
std::string controllerID,
std::string project,
std::string memberChangeTopic,
std::string networkChangeTopic);
virtual ~PubSubChangeNotifier(); virtual ~PubSubChangeNotifier();
virtual void notifyNetworkChange( virtual void notifyNetworkChange(
@ -40,11 +44,8 @@ class PubSubChangeNotifier : public ControllerChangeNotifier {
const std::string& frontend = "") override; const std::string& frontend = "") override;
private: private:
std::shared_ptr<PubSubWriter> _cv1networkChangeWriter; std::shared_ptr<PubSubWriter> _networkChangeWriter;
std::shared_ptr<PubSubWriter> _cv1memberChangeWriter; std::shared_ptr<PubSubWriter> _memberChangeWriter;
std::shared_ptr<PubSubWriter> _cv2networkChangeWriter;
std::shared_ptr<PubSubWriter> _cv2memberChangeWriter;
}; };
} // namespace ZeroTier } // namespace ZeroTier

View file

@ -9,6 +9,10 @@ namespace ZeroTier {
struct PubSubConfig { struct PubSubConfig {
std::string project_id; std::string project_id;
std::string member_change_recv_topic;
std::string member_change_send_topic;
std::string network_change_recv_topic;
std::string network_change_send_topic;
}; };
struct BigTableConfig { struct BigTableConfig {

View file

@ -121,8 +121,8 @@ void PubSubListener::subscribe()
} }
} }
PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, DB* db) PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, std::string topic, DB* db)
: PubSubListener(controller_id, project, "controller-network-change-stream") : PubSubListener(controller_id, project, topic)
, _db(db) , _db(db)
{ {
} }
@ -199,8 +199,8 @@ void PubSubNetworkListener::onNotification(const std::string& payload)
} }
} }
PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, DB* db) PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db)
: PubSubListener(controller_id, project, "controller-member-change-stream") : PubSubListener(controller_id, project, topic)
, _db(db) , _db(db)
{ {
} }

View file

@ -45,7 +45,7 @@ class PubSubListener : public NotificationListener {
*/ */
class PubSubNetworkListener : public PubSubListener { class PubSubNetworkListener : public PubSubListener {
public: public:
PubSubNetworkListener(std::string controller_id, std::string project, DB* db); PubSubNetworkListener(std::string controller_id, std::string project, std::string topic, DB* db);
virtual ~PubSubNetworkListener(); virtual ~PubSubNetworkListener();
virtual void onNotification(const std::string& payload) override; virtual void onNotification(const std::string& payload) override;
@ -59,7 +59,7 @@ class PubSubNetworkListener : public PubSubListener {
*/ */
class PubSubMemberListener : public PubSubListener { class PubSubMemberListener : public PubSubListener {
public: public:
PubSubMemberListener(std::string controller_id, std::string project, DB* db); PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db);
virtual ~PubSubMemberListener(); virtual ~PubSubMemberListener();
virtual void onNotification(const std::string& payload) override; virtual void onNotification(const std::string& payload) override;

View file

@ -51,11 +51,15 @@ PubSubWriter::~PubSubWriter()
{ {
} }
bool PubSubWriter::publishMessage(const std::string& payload) bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend)
{ {
std::vector<std::pair<std::string, std::string> > attributes; std::vector<std::pair<std::string, std::string> > attributes;
attributes.emplace_back("controller_id", _controller_id); attributes.emplace_back("controller_id", _controller_id);
if (! frontend.empty()) {
attributes.emplace_back("frontend", frontend);
}
auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build(); auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build();
auto message_id = _publisher->Publish(std::move(msg)).get(); auto message_id = _publisher->Publish(std::move(msg)).get();
if (! message_id) { if (! message_id) {
@ -67,7 +71,10 @@ bool PubSubWriter::publishMessage(const std::string& payload)
return true; return true;
} }
bool PubSubWriter::publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork) bool PubSubWriter::publishNetworkChange(
const nlohmann::json& oldNetwork,
const nlohmann::json& newNetwork,
const std::string& frontend)
{ {
pbmessages::NetworkChange nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork); pbmessages::NetworkChange nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork);
std::string payload; std::string payload;
@ -76,10 +83,13 @@ bool PubSubWriter::publishNetworkChange(const nlohmann::json& oldNetwork, const
return false; return false;
} }
return publishMessage(payload); return publishMessage(payload, frontend);
} }
bool PubSubWriter::publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember) bool PubSubWriter::publishMemberChange(
const nlohmann::json& oldMember,
const nlohmann::json& newMember,
const std::string& frontend)
{ {
pbmessages::MemberChange mc = memberChangeFromJson(_controller_id, oldMember, newMember); pbmessages::MemberChange mc = memberChangeFromJson(_controller_id, oldMember, newMember);
std::string payload; std::string payload;
@ -88,7 +98,7 @@ bool PubSubWriter::publishMemberChange(const nlohmann::json& oldMember, const nl
return false; return false;
} }
return publishMessage(payload); return publishMessage(payload, frontend);
} }
bool PubSubWriter::publishStatusChange( bool PubSubWriter::publishStatusChange(
@ -124,7 +134,7 @@ bool PubSubWriter::publishStatusChange(
return false; return false;
} }
return publishMessage(payload); return publishMessage(payload, "");
} }
pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j) pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j)

View file

@ -13,9 +13,13 @@ class PubSubWriter {
PubSubWriter(std::string project, std::string topic, std::string controller_id); PubSubWriter(std::string project, std::string topic, std::string controller_id);
virtual ~PubSubWriter(); virtual ~PubSubWriter();
bool publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork); bool publishNetworkChange(
const nlohmann::json& oldNetwork,
const nlohmann::json& newNetwork,
const std::string& frontend);
bool publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember); bool
publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember, const std::string& frontend);
bool publishStatusChange( bool publishStatusChange(
std::string frontend, std::string frontend,
@ -27,7 +31,7 @@ class PubSubWriter {
int64_t last_seen); int64_t last_seen);
protected: protected:
bool publishMessage(const std::string& payload); bool publishMessage(const std::string& payload, const std::string& frontend);
private: private:
std::string _controller_id; std::string _controller_id;

View file

@ -1822,6 +1822,14 @@ class OneServiceImpl : public OneService {
json& ps = cc["pubsub"]; json& ps = cc["pubsub"];
_controllerConfig.pubSubConfig = new PubSubConfig(); _controllerConfig.pubSubConfig = new PubSubConfig();
_controllerConfig.pubSubConfig->project_id = OSUtils::jsonString(ps["project_id"], ""); _controllerConfig.pubSubConfig->project_id = OSUtils::jsonString(ps["project_id"], "");
_controllerConfig.pubSubConfig->member_change_recv_topic =
OSUtils::jsonString(ps["member_change_recv_topic"], "");
_controllerConfig.pubSubConfig->member_change_send_topic =
OSUtils::jsonString(ps["member_change_send_topic"], "");
_controllerConfig.pubSubConfig->network_change_recv_topic =
OSUtils::jsonString(ps["network_change_recv_topic"], "");
_controllerConfig.pubSubConfig->network_change_send_topic =
OSUtils::jsonString(ps["network_change_send_topic"], "");
} }
if (_controllerConfig.listenMode == "pubsub" && ! _controllerConfig.pubSubConfig) { if (_controllerConfig.listenMode == "pubsub" && ! _controllerConfig.pubSubConfig) {
fprintf(stderr, "ERROR: pubsub listenMode requires pubsub configuration in local.conf" ZT_EOL_S); fprintf(stderr, "ERROR: pubsub listenMode requires pubsub configuration in local.conf" ZT_EOL_S);