From 3f197121782bb8d1969de93f6580610769b4ef59 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Wed, 16 Jul 2025 10:40:19 -0700 Subject: [PATCH] inheritence mess cleanup --- controller/CV1.cpp | 6 ++-- controller/CV1.hpp | 3 ++ controller/CV2.cpp | 4 +-- controller/CV2.hpp | 3 ++ controller/DB.hpp | 4 --- controller/PostgreSQL.cpp | 60 --------------------------------- controller/PostgreSQL.hpp | 70 ++++++++++++++++++++++++++++++++++----- objects.mk | 1 - 8 files changed, 73 insertions(+), 78 deletions(-) diff --git a/controller/CV1.cpp b/controller/CV1.cpp index f96918baf..95d85a280 100644 --- a/controller/CV1.cpp +++ b/controller/CV1.cpp @@ -1209,7 +1209,7 @@ void CV1::_membersWatcher_Postgres() std::string stream = "member_" + _myAddressStr; fprintf(stderr, "Listening to member stream: %s\n", stream.c_str()); - MemberNotificationReceiver m(this, *c->c, stream); + MemberNotificationReceiver m(this, *c->c, stream); while (_run == 1) { c->c->await_notification(5, 0); @@ -1316,7 +1316,7 @@ void CV1::_networksWatcher_Postgres() auto c = _pool->borrow(); - NetworkNotificationReceiver n(this, *c->c, stream); + NetworkNotificationReceiver n(this, *c->c, stream); while (_run == 1) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); @@ -2022,7 +2022,7 @@ void CV1::onlineNotification_Redis() fprintf(stderr, "onlineNotification ran in %llu ms\n", total); span->End(); - + std::this_thread::sleep_for(std::chrono::seconds(5)); } } diff --git a/controller/CV1.hpp b/controller/CV1.hpp index 549b17912..b1a362798 100644 --- a/controller/CV1.hpp +++ b/controller/CV1.hpp @@ -43,6 +43,9 @@ struct RedisConfig; * but be aware that we might change it at any time. */ class CV1 : public DB { + friend class MemberNotificationReceiver; + friend class NetworkNotificationReceiver; + public: CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc); virtual ~CV1(); diff --git a/controller/CV2.cpp b/controller/CV2.cpp index c1206de94..28687ca4d 100644 --- a/controller/CV2.cpp +++ b/controller/CV2.cpp @@ -790,7 +790,7 @@ void CV2::membersDbWatcher() std::string stream = "member_" + _myAddressStr; fprintf(stderr, "Listening to member stream: %s\n", stream.c_str()); - MemberNotificationReceiver m(this, *c->c, stream); + MemberNotificationReceiver m(this, *c->c, stream); while (_run == 1) { c->c->await_notification(5, 0); @@ -809,7 +809,7 @@ void CV2::networksDbWatcher() auto c = _pool->borrow(); - NetworkNotificationReceiver n(this, *c->c, stream); + NetworkNotificationReceiver n(this, *c->c, stream); while (_run == 1) { c->c->await_notification(5, 0); diff --git a/controller/CV2.hpp b/controller/CV2.hpp index 8302b98d3..a0cc896d3 100644 --- a/controller/CV2.hpp +++ b/controller/CV2.hpp @@ -31,6 +31,9 @@ namespace ZeroTier { class CV2 : public DB { + friend class MemberNotificationReceiver; + friend class NetworkNotificationReceiver; + public: CV2(const Identity& myId, const char* path, int listenPort); virtual ~CV2(); diff --git a/controller/DB.hpp b/controller/DB.hpp index 9dd9c06c6..01b6f1266 100644 --- a/controller/DB.hpp +++ b/controller/DB.hpp @@ -61,10 +61,6 @@ struct AuthInfo { * Base class with common infrastructure for all controller DB implementations */ class DB { -#ifdef ZT_CONTROLLER_USE_LIBPQ - friend class MemberNotificationReceiver; - friend class NetworkNotificationReceiver; -#endif public: class ChangeListener { public: diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 2ee939322..3bc864601 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -2,70 +2,10 @@ #include "PostgreSQL.hpp" -#include "opentelemetry/trace/provider.h" - #include using namespace nlohmann; using namespace ZeroTier; -MemberNotificationReceiver::MemberNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p) -{ - fprintf(stderr, "initialize MemberNotificationReceiver\n"); -} - -void MemberNotificationReceiver::operator()(const std::string& payload, int packend_pid) -{ - auto provider = opentelemetry::trace::Provider::GetTracerProvider(); - auto tracer = provider->GetTracer("db_member_notification"); - auto span = tracer->StartSpan("db_member_notification::operator()"); - auto scope = tracer->WithActiveSpan(span); - span->SetAttribute("payload", payload); - - fprintf(stderr, "Member Notification received: %s\n", payload.c_str()); - Metrics::pgsql_mem_notification++; - json tmp(json::parse(payload)); - json& ov = tmp["old_val"]; - json& nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) - oldConfig = ov; - if (nv.is_object()) - newConfig = nv; - if (oldConfig.is_object() || newConfig.is_object()) { - _psql->_memberChanged(oldConfig, newConfig, _psql->isReady()); - fprintf(stderr, "payload sent\n"); - } -} - -NetworkNotificationReceiver::NetworkNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p) -{ - fprintf(stderr, "initialize NetworkNotificationReceiver\n"); -} - -void NetworkNotificationReceiver::operator()(const std::string& payload, int packend_pid) -{ - auto provider = opentelemetry::trace::Provider::GetTracerProvider(); - auto tracer = provider->GetTracer("db_network_notification"); - auto span = tracer->StartSpan("db_network_notification::operator()"); - auto scope = tracer->WithActiveSpan(span); - span->SetAttribute("payload", payload); - - fprintf(stderr, "Network Notification received: %s\n", payload.c_str()); - Metrics::pgsql_net_notification++; - json tmp(json::parse(payload)); - json& ov = tmp["old_val"]; - json& nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) - oldConfig = ov; - if (nv.is_object()) - newConfig = nv; - if (oldConfig.is_object() || newConfig.is_object()) { - _psql->_networkChanged(oldConfig, newConfig, _psql->isReady()); - fprintf(stderr, "payload sent\n"); - } -} - #endif \ No newline at end of file diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index dc73405f5..6f1704306 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -18,8 +18,10 @@ #include "ConnectionPool.hpp" #include "DB.hpp" +#include "opentelemetry/trace/provider.h" #include +#include #include namespace ZeroTier { @@ -56,32 +58,84 @@ class PostgresConnFactory : public ConnectionFactory { std::string m_connString; }; -class MemberNotificationReceiver : public pqxx::notification_receiver { +template class MemberNotificationReceiver : public pqxx::notification_receiver { public: - MemberNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel); + MemberNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p) + { + fprintf(stderr, "initialize MemberNotificationReceiver\n"); + } + virtual ~MemberNotificationReceiver() { fprintf(stderr, "MemberNotificationReceiver destroyed\n"); } - virtual void operator()(const std::string& payload, int backendPid); + virtual void operator()(const std::string& payload, int backendPid) + { + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + auto tracer = provider->GetTracer("db_member_notification"); + auto span = tracer->StartSpan("db_member_notification::operator()"); + auto scope = tracer->WithActiveSpan(span); + span->SetAttribute("payload", payload); + + fprintf(stderr, "Member Notification received: %s\n", payload.c_str()); + Metrics::pgsql_mem_notification++; + nlohmann::json tmp(nlohmann::json::parse(payload)); + nlohmann::json& ov = tmp["old_val"]; + nlohmann::json& nv = tmp["new_val"]; + nlohmann::json oldConfig, newConfig; + if (ov.is_object()) + oldConfig = ov; + if (nv.is_object()) + newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _psql->_memberChanged(oldConfig, newConfig, _psql->isReady()); + fprintf(stderr, "payload sent\n"); + } + } private: - DB* _psql; + T* _psql; }; -class NetworkNotificationReceiver : public pqxx::notification_receiver { +template class NetworkNotificationReceiver : public pqxx::notification_receiver { public: - NetworkNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel); + NetworkNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p) + { + fprintf(stderr, "initialize NetworkrNotificationReceiver\n"); + } + virtual ~NetworkNotificationReceiver() { fprintf(stderr, "NetworkNotificationReceiver destroyed\n"); }; - virtual void operator()(const std::string& payload, int packend_pid); + virtual void operator()(const std::string& payload, int packend_pid) + { + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + auto tracer = provider->GetTracer("db_network_notification"); + auto span = tracer->StartSpan("db_network_notification::operator()"); + auto scope = tracer->WithActiveSpan(span); + span->SetAttribute("payload", payload); + + fprintf(stderr, "Network Notification received: %s\n", payload.c_str()); + Metrics::pgsql_net_notification++; + nlohmann::json tmp(nlohmann::json::parse(payload)); + nlohmann::json& ov = tmp["old_val"]; + nlohmann::json& nv = tmp["new_val"]; + nlohmann::json oldConfig, newConfig; + if (ov.is_object()) + oldConfig = ov; + if (nv.is_object()) + newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _psql->_networkChanged(oldConfig, newConfig, _psql->isReady()); + fprintf(stderr, "payload sent\n"); + } + } private: - DB* _psql; + T* _psql; }; struct NodeOnlineRecord { diff --git a/objects.mk b/objects.mk index bba4c6fa3..238481a42 100644 --- a/objects.mk +++ b/objects.mk @@ -40,7 +40,6 @@ ONE_OBJS=\ controller/FileDB.o \ controller/LFDB.o \ controller/CtlUtil.o \ - controller/PostgreSQL.o \ controller/CV1.o \ controller/CV2.o \ osdep/EthernetTap.o \