From a5bd262b3a2569252c28a3ccbe49376ee2535029 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Tue, 2 Sep 2025 13:30:08 -0700 Subject: [PATCH] Wiring through initialization of the CentralDB version of the controller Still need to do the actual configuration from local.conf --- CMakeLists.txt | 2 +- nonfree/controller/BigTableStatusWriter.hpp | 5 - nonfree/controller/CMakeLists.txt | 2 + nonfree/controller/CentralDB.cpp | 42 ++++---- nonfree/controller/CentralDB.hpp | 20 +--- nonfree/controller/ControllerConfig.hpp | 66 ++++++++++++ .../controller/EmbeddedNetworkController.cpp | 101 ++++++++++++++++++ .../controller/EmbeddedNetworkController.hpp | 15 +++ nonfree/controller/PubSubListener.cpp | 1 + nonfree/controller/PubSubListener.hpp | 7 -- nonfree/controller/StatusWriter.hpp | 2 +- service/CMakeLists.txt | 1 + service/OneService.cpp | 18 +++- 13 files changed, 230 insertions(+), 52 deletions(-) create mode 100644 nonfree/controller/ControllerConfig.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c705c2853..d527ff4a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,7 @@ if(ZT1_CENTRAL_CONTROLLER) set(CMAKE_CXX_STANDARD 17 CACHE STRING "C++ standard to conform to" FORCE) set(CMAKE_CXX_STANDARD_REQUIRED True CACHE BOOL "C++ standard required" FORCE) set(CMAKE_CXX_EXTENSIONS ON CACHE BOOL "Enable compiler-specific extensions" FORCE) - add_definitions(-DZT_CONTROLLER_USE_LIBPQ -DZT1_CENTRAL_CONTROLLER) + add_definitions(-DZT_NONFREE_CONTROLLER=1 -DZT_CONTROLLER_USE_LIBPQ=1 -DZT1_CENTRAL_CONTROLLER=1) set(CONTROLLER_RUST_FEATURES ztcontroller) set(RUST_BUILD_COMMAND cargo build --release -F ${CONTROLLER_RUST_FEATURES}) else() diff --git a/nonfree/controller/BigTableStatusWriter.hpp b/nonfree/controller/BigTableStatusWriter.hpp index 8abc81ca3..61657c797 100644 --- a/nonfree/controller/BigTableStatusWriter.hpp +++ b/nonfree/controller/BigTableStatusWriter.hpp @@ -7,11 +7,6 @@ #include namespace ZeroTier { -struct BigTableConfig { - std::string project_id; - std::string instance_id; - std::string table_id; -}; class BigTableStatusWriter : public StatusWriter { public: diff --git a/nonfree/controller/CMakeLists.txt b/nonfree/controller/CMakeLists.txt index 57560ce20..07760c675 100644 --- a/nonfree/controller/CMakeLists.txt +++ b/nonfree/controller/CMakeLists.txt @@ -41,6 +41,7 @@ if (ZT1_CENTRAL_CONTROLLER) CV2.hpp CentralDB.cpp CentralDB.hpp + ControllerConfig.hpp NotificationListener.hpp PostgreSQL.cpp PostgreSQL.hpp @@ -55,6 +56,7 @@ if (ZT1_CENTRAL_CONTROLLER) BigTableStatusWriter.hpp PostgresStatusWriter.cpp PostgresStatusWriter.hpp + Redis.hpp RedisStatusWriter.cpp RedisStatusWriter.hpp ) diff --git a/nonfree/controller/CentralDB.cpp b/nonfree/controller/CentralDB.cpp index 846565e0f..f42f50414 100644 --- a/nonfree/controller/CentralDB.cpp +++ b/nonfree/controller/CentralDB.cpp @@ -19,6 +19,7 @@ #include "../../node/SHA512.hpp" #include "../../version.h" #include "BigTableStatusWriter.hpp" +#include "ControllerConfig.hpp" #include "CtlUtil.hpp" #include "EmbeddedNetworkController.hpp" #include "PostgresStatusWriter.hpp" @@ -55,15 +56,15 @@ using ItemStream = std::vector; CentralDB::CentralDB( const Identity& myId, - const char* path, + const char* connString, int listenPort, CentralDB::ListenerMode listenMode, CentralDB::StatusWriterMode statusMode, - ControllerConfig* cc) + const ControllerConfig* cc) : DB() , _listenerMode(listenMode) , _statusWriterMode(statusMode) - , _controllerConfig(cc) + , _cc(cc) , _pool() , _myId(myId) , _myAddress(myId.address()) @@ -72,7 +73,6 @@ CentralDB::CentralDB( , _run(1) , _waitNoticePrinted(false) , _listenPort(listenPort) - , _rc(cc->redisConfig) , _redis(NULL) , _cluster(NULL) , _redisMemberStatus(false) @@ -87,7 +87,7 @@ CentralDB::CentralDB( char myAddress[64]; _myAddressStr = myId.address().toString(myAddress); - _connString = std::string(path); + _connString = std::string(connString); auto f = std::make_shared(_connString); _pool = std::make_shared >(15, 5, std::static_pointer_cast(f)); @@ -126,15 +126,15 @@ CentralDB::CentralDB( } _pool->unborrow(c); - if ((listenMode == LISTENER_MODE_REDIS || statusMode == STATUS_WRITER_MODE_REDIS) && _rc != NULL) { + if ((listenMode == LISTENER_MODE_REDIS || statusMode == STATUS_WRITER_MODE_REDIS) && _cc->redisConfig != NULL) { auto innerspan = tracer->StartSpan("CentralDB::CentralDB::configureRedis"); auto innerscope = tracer->WithActiveSpan(innerspan); sw::redis::ConnectionOptions opts; sw::redis::ConnectionPoolOptions poolOpts; - opts.host = _rc->hostname; - opts.port = _rc->port; - opts.password = _rc->password; + opts.host = _cc->redisConfig->hostname; + opts.port = _cc->redisConfig->port; + opts.password = _cc->redisConfig->password; opts.db = 0; opts.keep_alive = true; opts.connect_timeout = std::chrono::seconds(3); @@ -142,7 +142,7 @@ CentralDB::CentralDB( poolOpts.wait_timeout = std::chrono::seconds(5); poolOpts.connection_lifetime = std::chrono::minutes(3); poolOpts.connection_idle_time = std::chrono::minutes(1); - if (_rc->clusterMode) { + if (_cc->redisConfig->clusterMode) { innerspan->SetAttribute("cluster_mode", "true"); fprintf(stderr, "Using Redis in Cluster Mode\n"); _cluster = std::make_shared(opts, poolOpts); @@ -168,8 +168,8 @@ CentralDB::CentralDB( switch (listenMode) { case LISTENER_MODE_REDIS: - if (_rc != NULL) { - if (_rc->clusterMode) { + if (_cc->redisConfig != NULL) { + if (_cc->redisConfig->clusterMode) { _membersDbWatcher = std::make_shared(_myAddressStr, _cluster, this); _networksDbWatcher = std::make_shared(_myAddressStr, _cluster, this); } @@ -202,8 +202,8 @@ CentralDB::CentralDB( switch (statusMode) { case STATUS_WRITER_MODE_REDIS: - if (_rc != NULL) { - if (_rc->clusterMode) { + if (_cc->redisConfig != NULL) { + if (_cc->redisConfig->clusterMode) { _statusWriter = std::make_shared(_cluster, _myAddressStr); } else { @@ -441,7 +441,7 @@ void CentralDB::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL) { - if (_controllerConfig->ssoEnabled) { + if (_cc->ssoEnabled) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("CentralDB"); auto span = tracer->StartSpan("CentralDB::getSSOAuthInfo"); @@ -797,7 +797,7 @@ void CentralDB::initializeMembers() if (! deletes.empty()) { try { - if (_rc->clusterMode) { + if (_cc->redisConfig->clusterMode) { auto tx = _cluster->transaction(_myAddressStr, true, false); for (std::string k : deletes) { tx.del(k); @@ -961,7 +961,7 @@ void CentralDB::initializeMembers() if (! networkMembers.empty()) { if (_redisMemberStatus) { fprintf(stderr, "Load member data into redis...\n"); - if (_rc->clusterMode) { + if (_cc->redisConfig->clusterMode) { auto tx = _cluster->transaction(_myAddressStr, true, false); uint64_t count = 0; for (auto it : networkMembers) { @@ -1071,7 +1071,7 @@ void CentralDB::heartbeat() try { if (_listenerMode == LISTENER_MODE_REDIS && _redisMemberStatus) { - if (_rc->clusterMode) { + if (_cc->redisConfig->clusterMode) { _cluster->zadd("controllers", "controllerId", ts); } else { @@ -1298,7 +1298,7 @@ void CentralDB::commitThread() std::string id = config["id"]; std::string controllerId = _myAddressStr.c_str(); std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { + if (_cc->redisConfig->clusterMode) { _cluster->sadd(key, id); } else { @@ -1340,7 +1340,7 @@ void CentralDB::commitThread() std::string id = config["id"]; std::string controllerId = _myAddressStr.c_str(); std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { + if (_cc->redisConfig->clusterMode) { _cluster->srem(key, id); _cluster->del("network-nodes-online:{" + controllerId + "}:" + id); } @@ -1392,7 +1392,7 @@ void CentralDB::commitThread() std::string networkId = config["nwid"]; std::string controllerId = _myAddressStr.c_str(); std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; - if (_rc->clusterMode) { + if (_cc->redisConfig->clusterMode) { _cluster->srem(key, memberId); _cluster->del("member:{" + controllerId + "}:" + networkId + ":" + memberId); } diff --git a/nonfree/controller/CentralDB.hpp b/nonfree/controller/CentralDB.hpp index e130c6344..0ffd1e562 100644 --- a/nonfree/controller/CentralDB.hpp +++ b/nonfree/controller/CentralDB.hpp @@ -21,19 +21,8 @@ struct SmeeClient; } namespace ZeroTier { - struct RedisConfig; -struct PubSubConfig; -struct PostgresNotifyConfig; -struct BigTableConfig; - -struct ControllerConfig { - bool ssoEnabled; - RedisConfig* redisConfig; - PubSubConfig* pubSubConfig; - PostgresNotifyConfig* postgresNotifyConfig; - BigTableConfig* bigTableConfig; -}; +struct ControllerConfig; class CentralDB : public DB { public: @@ -51,11 +40,11 @@ class CentralDB : public DB { CentralDB( const Identity& myId, - const char* path, + const char* connString, int listenPort, CentralDB::ListenerMode mode, CentralDB::StatusWriterMode statusMode, - ControllerConfig* cc); + const ControllerConfig* cc); virtual ~CentralDB(); virtual bool waitForReady(); @@ -109,7 +98,7 @@ class CentralDB : public DB { ListenerMode _listenerMode; StatusWriterMode _statusWriterMode; - ControllerConfig* _controllerConfig; + const ControllerConfig* _cc; std::shared_ptr > _pool; const Identity _myId; @@ -136,7 +125,6 @@ class CentralDB : public DB { int _listenPort; uint8_t _ssoPsk[48]; - RedisConfig* _rc; std::shared_ptr _redis; std::shared_ptr _cluster; bool _redisMemberStatus; diff --git a/nonfree/controller/ControllerConfig.hpp b/nonfree/controller/ControllerConfig.hpp new file mode 100644 index 000000000..3ee3c8ea5 --- /dev/null +++ b/nonfree/controller/ControllerConfig.hpp @@ -0,0 +1,66 @@ +#ifndef CONTROLLER_CONFIG_HPP +#define CONTROLLER_CONFIG_HPP + +#include "Redis.hpp" + +#include + +namespace ZeroTier { + +struct PubSubConfig { + std::string project; +}; + +struct PostgresNotifyConfig { + std::string channel; +}; + +struct BigTableConfig { + std::string project_id; + std::string instance_id; + std::string table_id; +}; + +struct ControllerConfig { + bool ssoEnabled; + std::string listenMode; + std::string statusMode; + RedisConfig* redisConfig; + PubSubConfig* pubSubConfig; + PostgresNotifyConfig* postgresNotifyConfig; + BigTableConfig* bigTableConfig; + + ControllerConfig() + : ssoEnabled(false) + , listenMode("") + , statusMode("") + , redisConfig(nullptr) + , pubSubConfig(nullptr) + , postgresNotifyConfig(nullptr) + , bigTableConfig(nullptr) + { + } + + ~ControllerConfig() + { + if (redisConfig) { + delete redisConfig; + redisConfig = nullptr; + } + if (pubSubConfig) { + delete pubSubConfig; + pubSubConfig = nullptr; + } + if (postgresNotifyConfig) { + delete postgresNotifyConfig; + postgresNotifyConfig = nullptr; + } + if (bigTableConfig) { + delete bigTableConfig; + bigTableConfig = nullptr; + } + } +}; + +} // namespace ZeroTier +#endif // CONTROLLER_CONFIG_HPP \ No newline at end of file diff --git a/nonfree/controller/EmbeddedNetworkController.cpp b/nonfree/controller/EmbeddedNetworkController.cpp index c29feb620..dd8092603 100644 --- a/nonfree/controller/EmbeddedNetworkController.cpp +++ b/nonfree/controller/EmbeddedNetworkController.cpp @@ -26,6 +26,10 @@ #ifdef ZT_CONTROLLER_USE_LIBPQ #include "CV1.hpp" #include "CV2.hpp" +#include "CentralDB.hpp" +#ifdef ZT1_CENTRAL_CONTROLLER +#include "ControllerConfig.hpp" +#endif #endif #include "../../node/CertificateOfMembership.hpp" @@ -559,6 +563,61 @@ EmbeddedNetworkController::EmbeddedNetworkController( { } +#ifdef ZT1_CENTRAL_CONTROLLER +EmbeddedNetworkController::EmbeddedNetworkController( + Node* node, + const char* ztPath, + const char* dbPath, + int listenPort, + const ControllerConfig* cc) + : _startTime(OSUtils::now()) + , _listenPort(listenPort) + , _node(node) + , _ztPath(ztPath) + , _path(dbPath) + , _signingId() + , _signingIdAddressString() + , _sender((NetworkController::Sender*)0) + , _db(this) + , _queue() + , _threads() + , _threads_l() + , _memberStatus() + , _memberStatus_l() + , _expiringSoon() + , _expiringSoon_l() + , _rc(nullptr) + , _cc(cc) + , _ssoExpiryRunning(true) + , _ssoExpiry(std::thread(&EmbeddedNetworkController::_ssoExpiryThread, this)) +#ifdef CENTRAL_CONTROLLER_REQUEST_BENCHMARK + , _member_status_lookup { "nc_member_status_lookup", "" } + , _member_status_lookup_count { "nc_member_status_lookup_count", "" } + , _node_is_online { "nc_node_is_online", "" } + , _node_is_online_count { "nc_node_is_online_count", "" } + , _get_and_init_member { "nc_get_and_init_member", "" } + , _get_and_init_member_count { "nc_get_and_init_member_count", "" } + , _have_identity { "nc_have_identity", "" } + , _have_identity_count { "nc_have_identity_count", "" } + , _determine_auth { "nc_determine_auth", "" } + , _determine_auth_count { "nc_determine_auth_count", "" } + , _sso_check { "nc_sso_check", "" } + , _sso_check_count { "nc_sso_check_count", "" } + , _auth_check { "nc_auth_check", "" } + , _auth_check_count { "nc_auth_check_count", "" } + , _json_schlep { "nc_json_schlep", "" } + , _json_schlep_count { "nc_json_schlep_count", "" } + , _issue_certificate { "nc_issue_certificate", "" } + , _issue_certificate_count { "nc_issue_certificate_count", "" } + , _save_member { "nc_save_member", "" } + , _save_member_count { "nc_save_member_count", "" } + , _send_netconf { "nc_send_netconf2", "" } + , _send_netconf_count { "nc_send_netconf2_count", "" } +#endif +{ +} +#endif + EmbeddedNetworkController::~EmbeddedNetworkController() { std::lock_guard l(_threads_l); @@ -587,6 +646,47 @@ void EmbeddedNetworkController::init(const Identity& signingId, Sender* sender) _sender = sender; _signingIdAddressString = signingId.address().toString(tmp); +#ifdef ZT1_CENTRAL_CONTROLLER + if (! _cc) { + throw std::runtime_error("controller config required"); + } + + if (_path.length() > 9 || (_path.substr(0, 9) != "postgres:")) { + throw std::runtime_error("central controller requires postgres db"); + } + + const char* connString = _path.substr(9).c_str(); + + CentralDB::ListenerMode lm; + if (_cc->listenMode == "pgsql") { + lm = CentralDB::LISTENER_MODE_PGSQL; + } + else if (_cc->listenMode == "redis") { + lm = CentralDB::LISTENER_MODE_REDIS; + } + else if (_cc->listenMode == "pubsub") { + lm = CentralDB::LISTENER_MODE_PUBSUB; + } + else { + throw std::runtime_error("unsupported listen mode"); + } + + CentralDB::StatusWriterMode sm; + if (_cc->statusMode == "pgsql") { + sm = CentralDB::STATUS_WRITER_MODE_PGSQL; + } + else if (_cc->statusMode == "redis") { + sm = CentralDB::STATUS_WRITER_MODE_REDIS; + } + else if (_cc->statusMode == "bigtable") { + sm = CentralDB::STATUS_WRITER_MODE_BIGTABLE; + } + else { + throw std::runtime_error("unsupported status mode"); + } + + _db.addDB(std::shared_ptr(new CentralDB(_signingId, connString, _listenPort, lm, sm, _cc))); +#else #ifdef ZT_CONTROLLER_USE_LIBPQ if ((_path.length() > 9) && (_path.substr(0, 9) == "postgres:")) { fprintf(stderr, "CV1\n"); @@ -603,6 +703,7 @@ void EmbeddedNetworkController::init(const Identity& signingId, Sender* sender) #ifdef ZT_CONTROLLER_USE_LIBPQ } #endif +#endif // ZT1_CENTRAL_CONTROLLER _db.waitForReady(); } diff --git a/nonfree/controller/EmbeddedNetworkController.hpp b/nonfree/controller/EmbeddedNetworkController.hpp index 3886c6391..085ece3c8 100644 --- a/nonfree/controller/EmbeddedNetworkController.hpp +++ b/nonfree/controller/EmbeddedNetworkController.hpp @@ -30,6 +30,10 @@ namespace ZeroTier { class Node; struct RedisConfig; +#ifdef ZT1_CENTRAL_CONTROLLER +class ControllerConfig; +#endif + class EmbeddedNetworkController : public NetworkController , public DB::ChangeListener { @@ -39,6 +43,14 @@ class EmbeddedNetworkController * @param dbPath Database path (file path or database credentials) */ EmbeddedNetworkController(Node* node, const char* ztPath, const char* dbPath, int listenPort, RedisConfig* rc); +#ifdef ZT1_CENTRAL_CONTROLLER + EmbeddedNetworkController( + Node* node, + const char* ztPath, + const char* dbPath, + int listenPort, + const ControllerConfig* cc); +#endif virtual ~EmbeddedNetworkController(); virtual void init(const Identity& signingId, Sender* sender); @@ -146,6 +158,9 @@ class EmbeddedNetworkController std::mutex _expiringSoon_l; RedisConfig* _rc; +#ifdef ZT1_CENTRAL_CONTROLLER + const ControllerConfig* _cc; +#endif std::string _ssoRedirectURL; bool _ssoExpiryRunning; diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index 8c6b48e4a..5126c0fcb 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -1,6 +1,7 @@ #ifdef ZT_CONTROLLER_USE_LIBPQ #include "PubSubListener.hpp" +#include "ControllerConfig.hpp" #include "DB.hpp" #include "member.pb.h" #include "network.pb.h" diff --git a/nonfree/controller/PubSubListener.hpp b/nonfree/controller/PubSubListener.hpp index b8f08aeae..5c6aa712c 100644 --- a/nonfree/controller/PubSubListener.hpp +++ b/nonfree/controller/PubSubListener.hpp @@ -15,13 +15,6 @@ namespace ZeroTier { class DB; -struct PubSubConfig { - const char* controller_id; - std::string project; - std::string topic; - uint64_t listen_timeout; -}; - /** * Base class for GCP PubSub listeners */ diff --git a/nonfree/controller/StatusWriter.hpp b/nonfree/controller/StatusWriter.hpp index 29e7f350d..78b36f586 100644 --- a/nonfree/controller/StatusWriter.hpp +++ b/nonfree/controller/StatusWriter.hpp @@ -14,7 +14,7 @@ namespace ZeroTier { */ class StatusWriter { public: - virtual ~StatusWriter() = 0; + virtual ~StatusWriter() = default; virtual void updateNodeStatus( const std::string& network_id, diff --git a/service/CMakeLists.txt b/service/CMakeLists.txt index da38d9a4c..359d578b5 100644 --- a/service/CMakeLists.txt +++ b/service/CMakeLists.txt @@ -35,6 +35,7 @@ if(ZT1_CENTRAL_CONTROLLER) ) list(APPEND LINK_LIBS pqxx + redis++::redis++_static opentelemetry-cpp::sdk ) endif() diff --git a/service/OneService.cpp b/service/OneService.cpp index aea790194..8ebc85589 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -130,6 +130,10 @@ using json = nlohmann::json; #include "../nonfree/controller/EmbeddedNetworkController.hpp" #include "../nonfree/controller/PostgreSQL.hpp" #include "../nonfree/controller/Redis.hpp" +#ifdef ZT1_CENTRAL_CONTROLLER +#include "../nonfree/controller/CentralDB.hpp" +#include "../nonfree/controller/ControllerConfig.hpp" +#endif #include "../osdep/EthernetTap.hpp" #ifdef __WINDOWS__ #include "../osdep/WindowsEthernetTap.hpp" @@ -1009,6 +1013,10 @@ class OneServiceImpl : public OneService { double _exporterSampleRate; #endif +#ifdef ZT1_CENTRAL_CONTROLLER + ControllerConfig _controllerConfig; +#endif + // end member variables ---------------------------------------------------- OneServiceImpl(const char* hp, unsigned int port) @@ -1055,6 +1063,9 @@ class OneServiceImpl : public OneService { , _traceProvider(nullptr) , _exporterEndpoint() , _exporterSampleRate(1.0) +#endif +#ifdef ZT1_CENTRAL_CONTROLLER + , _controllerConfig() #endif { _ports[0] = 0; @@ -1351,10 +1362,15 @@ class OneServiceImpl : public OneService { // Delete legacy iddb.d if present (cleanup) OSUtils::rmDashRf((_homePath + ZT_PATH_SEPARATOR_S "iddb.d").c_str()); - // Network controller is now enabled by default for desktop and server + // Network controller is now disabled by default for desktop and server #ifdef ZT_NONFREE_CONTROLLER +#ifdef ZT1_CENTRAL_CONTROLLER + _controller = new EmbeddedNetworkController( + _node, _homePath.c_str(), _controllerDbPath.c_str(), _ports[0], &_controllerConfig); +#else _controller = new EmbeddedNetworkController(_node, _homePath.c_str(), _controllerDbPath.c_str(), _ports[0], _rc); +#endif if (! _ssoRedirectURL.empty()) { _controller->setSSORedirectURL(_ssoRedirectURL); }