diff --git a/controller/BigTableStatusWriter.hpp b/controller/BigTableStatusWriter.hpp index 85c1e0b4d..8abc81ca3 100644 --- a/controller/BigTableStatusWriter.hpp +++ b/controller/BigTableStatusWriter.hpp @@ -7,6 +7,12 @@ #include namespace ZeroTier { +struct BigTableConfig { + std::string project_id; + std::string instance_id; + std::string table_id; +}; + class BigTableStatusWriter : public StatusWriter { public: BigTableStatusWriter(const std::string& project_id, const std::string& instance_id, const std::string& table_id); diff --git a/controller/CentralDB.cpp b/controller/CentralDB.cpp index ad054b691..29b388c3b 100644 --- a/controller/CentralDB.cpp +++ b/controller/CentralDB.cpp @@ -18,11 +18,14 @@ #include "../node/Constants.hpp" #include "../node/SHA512.hpp" #include "../version.h" +#include "BigTableStatusWriter.hpp" #include "CtlUtil.hpp" #include "EmbeddedNetworkController.hpp" +#include "PostgresStatusWriter.hpp" #include "PubSubListener.hpp" #include "Redis.hpp" #include "RedisListener.hpp" +#include "RedisStatusWriter.hpp" #include "opentelemetry/trace/provider.h" #include @@ -50,9 +53,16 @@ using Attrs = std::vector >; using Item = std::pair; using ItemStream = std::vector; -CentralDB::CentralDB(const Identity& myId, const char* path, int listenPort, CentralDB::ListenerMode mode, ControllerConfig* cc) +CentralDB::CentralDB( + const Identity& myId, + const char* path, + int listenPort, + CentralDB::ListenerMode listenMode, + CentralDB::StatusWriterMode statusMode, + ControllerConfig* cc) : DB() - , _listenerMode(mode) + , _listenerMode(listenMode) + , _statusWriterMode(statusMode) , _controllerConfig(cc) , _pool() , _myId(myId) @@ -79,7 +89,8 @@ CentralDB::CentralDB(const Identity& myId, const char* path, int listenPort, Cen _myAddressStr = myId.address().toString(myAddress); _connString = std::string(path); auto f = std::make_shared(_connString); - _pool = std::make_shared >(15, 5, std::static_pointer_cast(f)); + _pool = + std::make_shared >(15, 5, std::static_pointer_cast(f)); memset(_ssoPsk, 0, sizeof(_ssoPsk)); char* const ssoPskHex = getenv("ZT_SSO_PSK"); @@ -106,12 +117,16 @@ CentralDB::CentralDB(const Identity& myId, const char* path, int listenPort, Cen txn.commit(); if (dbVersion < DB_MINIMUM_VERSION) { - fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION); + fprintf( + stderr, + "Central database schema version too low. This controller version requires a minimum schema version of " + "%d. Please upgrade your Central instance", + DB_MINIMUM_VERSION); exit(1); } _pool->unborrow(c); - if (mode == LISTENER_MODE_REDIS && _rc != NULL) { + if ((listenMode == LISTENER_MODE_REDIS || statusMode == STATUS_WRITER_MODE_REDIS) && _rc != NULL) { auto innerspan = tracer->StartSpan("CentralDB::CentralDB::configureRedis"); auto innerscope = tracer->WithActiveSpan(innerspan); @@ -141,7 +156,9 @@ CentralDB::CentralDB(const Identity& myId, const char* path, int listenPort, Cen _readyLock.lock(); - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); + fprintf( + stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, + ::_timestr(), (unsigned long long)_myAddress.toInt()); _waitNoticePrinted = true; initializeNetworks(); @@ -149,7 +166,7 @@ CentralDB::CentralDB(const Identity& myId, const char* path, int listenPort, Cen _heartbeatThread = std::thread(&CentralDB::heartbeat, this); - switch (mode) { + switch (listenMode) { case LISTENER_MODE_REDIS: if (_rc != NULL) { if (_rc->clusterMode) { @@ -166,11 +183,14 @@ CentralDB::CentralDB(const Identity& myId, const char* path, int listenPort, Cen } case LISTENER_MODE_PUBSUB: if (cc->pubSubConfig != NULL) { - _membersDbWatcher = std::make_shared(_myAddressStr, cc->pubSubConfig->listen_timeout, this); - _networksDbWatcher = std::make_shared(_myAddressStr, cc->pubSubConfig->listen_timeout, this); + _membersDbWatcher = + std::make_shared(_myAddressStr, cc->pubSubConfig->listen_timeout, this); + _networksDbWatcher = + std::make_shared(_myAddressStr, cc->pubSubConfig->listen_timeout, this); } else { - throw std::runtime_error("CentralDB: PubSub listener mode selected but no PubSub configuration provided"); + throw std::runtime_error( + "CentralDB: PubSub listener mode selected but no PubSub configuration provided"); } break; case LISTENER_MODE_PGSQL: @@ -180,6 +200,30 @@ CentralDB::CentralDB(const Identity& myId, const char* path, int listenPort, Cen break; } + switch (statusMode) { + case STATUS_WRITER_MODE_REDIS: + if (_rc != NULL) { + if (_rc->clusterMode) { + _statusWriter = std::make_shared(_cluster, _myAddressStr); + } + else { + _statusWriter = std::make_shared(_redis, _myAddressStr); + } + } + else { + throw std::runtime_error("CentralDB: Redis status mode selected but no Redis configuration provided"); + } + break; + case STATUS_WRITER_MODE_BIGTABLE: + _statusWriter = std::make_shared( + cc->bigTableConfig->project_id, cc->bigTableConfig->instance_id, cc->bigTableConfig->table_id); + break; + case STATUS_WRITER_MODE_PGSQL: + default: + _statusWriter = std::make_shared(_pool); + break; + } + for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) { _commitThread[i] = std::thread(&CentralDB::commitThread, this); } @@ -232,7 +276,8 @@ void CentralDB::configureSmee() if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) { fprintf(stderr, "creating smee client\n"); - std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port); + std::string hostPort = + std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port); this->_smee = rustybits::smee_client_new(hostPort.c_str(), ns, task_queue); } else { @@ -362,7 +407,11 @@ void CentralDB::eraseMember(const uint64_t networkId, const uint64_t memberId) _memberChanged(tmp.first, nullJson, true); } -void CentralDB::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch) +void CentralDB::nodeIsOnline( + const uint64_t networkId, + const uint64_t memberId, + const InetAddress& physicalAddress, + const char* osArch) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("CentralDB"); @@ -428,15 +477,17 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri std::string nonce = ""; // check if the member exists first. - pqxx::row count = w.exec_params1("SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, networkId); + pqxx::row count = w.exec_params1( + "SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, + networkId); if (count[0].as() == 1) { // get active nonce, if exists. pqxx::result r = w.exec_params( "SELECT nonce FROM ztc_sso_expiry " "WHERE network_id = $1 AND member_id = $2 " - "AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)", - networkId, - memberId); + "AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= " + "nonce_expiration)", + networkId, memberId); if (r.size() == 0) { // no active nonce. @@ -445,8 +496,7 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri "SELECT nonce FROM ztc_sso_expiry " "WHERE network_id = $1 AND member_id = $2 " "AND authentication_expiry_time IS NULL AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)", - networkId, - memberId); + networkId, memberId); if (r.size() == 1) { // we have an existing nonce. Use it @@ -464,10 +514,7 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri "INSERT INTO ztc_sso_expiry " "(nonce, nonce_expiration, network_id, member_id) VALUES " "($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)", - nonce, - OSUtils::now() + 300000, - networkId, - memberId); + nonce, OSUtils::now() + 300000, networkId, memberId); w.commit(); } @@ -513,7 +560,9 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri sso_version = r.at(0)[4].as >().value_or(1); } else if (r.size() > 1) { - fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", networkId.c_str()); + fprintf( + stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", + networkId.c_str()); } else { fprintf(stderr, "No client or auth endpoint?!?\n"); @@ -531,13 +580,10 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri if (info.version == 0) { char url[2048] = { 0 }; OSUtils::ztsnprintf( - url, - sizeof(authenticationURL), - "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&nonce=%s&state=%s&client_id=%s", - authorization_endpoint.c_str(), - url_encode(redirectURL).c_str(), - nonce.c_str(), - state_hex, + url, sizeof(authenticationURL), + "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=" + "%s&nonce=%s&state=%s&client_id=%s", + authorization_endpoint.c_str(), url_encode(redirectURL).c_str(), nonce.c_str(), state_hex, client_id.c_str()); info.authenticationURL = std::string(url); } @@ -551,18 +597,17 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri #ifdef ZT_DEBUG fprintf( stderr, - "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: %s\n", - info.ssoClientID.c_str(), - info.issuerURL.c_str(), - info.ssoNonce.c_str(), - info.ssoState.c_str(), - info.centralAuthURL.c_str(), - provider.c_str()); + "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: " + "%s\n", + info.ssoClientID.c_str(), info.issuerURL.c_str(), info.ssoNonce.c_str(), + info.ssoState.c_str(), info.centralAuthURL.c_str(), provider.c_str()); #endif } } else { - fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str()); + fprintf( + stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), + authorization_endpoint.c_str()); } } @@ -638,12 +683,16 @@ void CentralDB::initializeNetworks() config["lastModified"] = last_modified.value_or(0); config["revision"] = revision.value_or(0); config["capabilities"] = cfgtmp["capabilities"].is_array() ? cfgtmp["capabilities"] : json::array(); - config["enableBroadcast"] = cfgtmp["enableBroadcast"].is_boolean() ? cfgtmp["enableBroadcast"].get() : false; + config["enableBroadcast"] = + cfgtmp["enableBroadcast"].is_boolean() ? cfgtmp["enableBroadcast"].get() : false; config["mtu"] = cfgtmp["mtu"].is_number() ? cfgtmp["mtu"].get() : 2800; - config["multicastLimit"] = cfgtmp["multicastLimit"].is_number() ? cfgtmp["multicastLimit"].get() : 64; + config["multicastLimit"] = + cfgtmp["multicastLimit"].is_number() ? cfgtmp["multicastLimit"].get() : 64; config["private"] = cfgtmp["private"].is_boolean() ? cfgtmp["private"].get() : true; - config["remoteTraceLevel"] = cfgtmp["remoteTraceLevel"].is_number() ? cfgtmp["remoteTraceLevel"].get() : 0; - config["remoteTraceTarget"] = cfgtmp["remoteTraceTarget"].is_string() ? cfgtmp["remoteTraceTarget"].get() : ""; + config["remoteTraceLevel"] = + cfgtmp["remoteTraceLevel"].is_number() ? cfgtmp["remoteTraceLevel"].get() : 0; + config["remoteTraceTarget"] = + cfgtmp["remoteTraceTarget"].is_string() ? cfgtmp["remoteTraceTarget"].get() : ""; config["revision"] = revision.value_or(0); config["rules"] = cfgtmp["rules"].is_array() ? cfgtmp["rules"] : json::array(); config["tags"] = cfgtmp["tags"].is_array() ? cfgtmp["tags"] : json::array(); @@ -667,7 +716,9 @@ void CentralDB::initializeNetworks() config["objtype"] = "network"; config["routes"] = cfgtmp["routes"].is_array() ? cfgtmp["routes"] : json::array(); config["clientId"] = cfgtmp["clientId"].is_string() ? cfgtmp["clientId"].get() : ""; - config["authorizationEndpoint"] = cfgtmp["authorizationEndpoint"].is_string() ? cfgtmp["authorizationEndpoint"].get() : nullptr; + config["authorizationEndpoint"] = cfgtmp["authorizationEndpoint"].is_string() + ? cfgtmp["authorizationEndpoint"].get() + : nullptr; config["provider"] = cfgtmp["ssoProvider"].is_string() ? cfgtmp["ssoProvider"].get() : ""; if (! cfgtmp["dns"].is_object()) { cfgtmp["dns"] = json::object(); @@ -677,7 +728,8 @@ void CentralDB::initializeNetworks() else { config["dns"] = cfgtmp["dns"]; } - config["ipAssignmentPools"] = cfgtmp["ipAssignmentPools"].is_array() ? cfgtmp["ipAssignmentPools"] : json::array(); + config["ipAssignmentPools"] = + cfgtmp["ipAssignmentPools"].is_array() ? cfgtmp["ipAssignmentPools"] : json::array(); Metrics::network_count++; @@ -699,7 +751,9 @@ void CentralDB::initializeNetworks() if (++this->_ready == 2) { if (_waitNoticePrinted) { - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt()); + fprintf( + stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), + (unsigned long long)_myAddress.toInt()); } _readyLock.unlock(); } @@ -767,7 +821,8 @@ void CentralDB::initializeMembers() char qbuf[2048]; sprintf( qbuf, - "SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, nm.no_auto_assign_ips, " + "SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, " + "nm.no_auto_assign_ips, " "nm.sso_exempt, (EXTRACT(EPOCH FROM nm.authentication_expiry_time AT TIME ZONE 'UTC')*1000)::bigint, " "(EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, " "(EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, " @@ -938,7 +993,9 @@ void CentralDB::initializeMembers() if (++this->_ready == 2) { if (_waitNoticePrinted) { - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt()); + fprintf( + stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), + (unsigned long long)_myAddress.toInt()); } _readyLock.unlock(); } @@ -997,13 +1054,10 @@ void CentralDB::heartbeat() w.exec_params0( "INSERT INTO controllers_ctl (id, hostname, last_heartbeat, public_identity, version) VALUES " "($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5) " - "ON CONFLICT (id) DO UPDATE SET hostname = EXCLUDED.hostname, last_heartbeat = EXCLUDED.last_heartbeat, " + "ON CONFLICT (id) DO UPDATE SET hostname = EXCLUDED.hostname, last_heartbeat = " + "EXCLUDED.last_heartbeat, " "public_identity = EXCLUDED.public_identity, version = EXCLUDED.version", - controllerId, - hostname, - ts, - publicIdentity, - versionStr); + controllerId, hostname, ts, publicIdentity, versionStr); w.commit(); } catch (std::exception& e) { @@ -1097,13 +1151,15 @@ void CentralDB::commitThread() continue; } - pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId); + pqxx::row mrow = w.exec_params1( + "SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId); int membercount = mrow[0].as(); bool isNewMember = (membercount == 0); pqxx::result res = w.exec_params0( - "INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, ip_assignments, " + "INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, " + "ip_assignments, " "no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, creation_time, " "identity, last_authorized_time, last_deauthorized_time, " "remote_trace_level, remote_trace_target, revision, tags, version_major, version_minor, " @@ -1114,40 +1170,31 @@ void CentralDB::commitThread() "ON CONFLICT (device_id, network_id) DO UPDATE SET " "authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, " "ip_assignments = EXCLUDED.ip_assignments, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " - "sso_exempt = EXCLUDED.sso_exempt, authentication_expiry_time = EXCLUDED.authentication_expiry_time, " + "sso_exempt = EXCLUDED.sso_exempt, authentication_expiry_time = " + "EXCLUDED.authentication_expiry_time, " "capabilities = EXCLUDED.capabilities, creation_time = EXCLUDED.creation_time, " "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " "last_deauthorized_time = EXCLUDED.last_deauthorized_time, " - "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " + "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = " + "EXCLUDED.remote_trace_target, " "revision = EXCLUDED.revision, tags = EXCLUDED.tags, version_major = EXCLUDED.version_major, " "version_minor = EXCLUDED.version_minor, version_revision = EXCLUDED.version_revision, " "version_protocol = EXCLUDED.version_protocol", - memberId, - networkId, - (bool)config["authorized"], - (bool)config["activeBridge"], - config["ipAssignments"].get >(), - (bool)config["noAutoAssignIps"], - (bool)config["ssoExempt"], - (uint64_t)config["authenticationExpiryTime"], - OSUtils::jsonDump(config["capabilities"], -1), - (uint64_t)config["creationTime"], - OSUtils::jsonString(config["identity"], ""), - (uint64_t)config["lastAuthorizedTime"], - (uint64_t)config["lastDeauthorizedTime"], - (int)config["remoteTraceLevel"], - target, - (uint64_t)config["revision"], - OSUtils::jsonDump(config["tags"], -1), - (int)config["vMajor"], - (int)config["vMinor"], - (int)config["vRev"], - (int)config["vProto"]); + memberId, networkId, (bool)config["authorized"], (bool)config["activeBridge"], + config["ipAssignments"].get >(), (bool)config["noAutoAssignIps"], + (bool)config["ssoExempt"], (uint64_t)config["authenticationExpiryTime"], + OSUtils::jsonDump(config["capabilities"], -1), (uint64_t)config["creationTime"], + OSUtils::jsonString(config["identity"], ""), (uint64_t)config["lastAuthorizedTime"], + (uint64_t)config["lastDeauthorizedTime"], (int)config["remoteTraceLevel"], target, + (uint64_t)config["revision"], OSUtils::jsonDump(config["tags"], -1), (int)config["vMajor"], + (int)config["vMinor"], (int)config["vRev"], (int)config["vProto"]); w.commit(); if (! isNewMember) { - pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", memberId, networkId); + pqxx::result res = w.exec_params0( + "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", memberId, + networkId); } if (_smee != NULL && isNewMember) { @@ -1181,11 +1228,15 @@ void CentralDB::commitThread() _memberChanged(memOrig, memNew, qitem.second); } else { - fprintf(stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt); + fprintf( + stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", + _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt); } } catch (std::exception& e) { - fprintf(stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), memberId.c_str(), e.what()); + fprintf( + stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), + memberId.c_str(), e.what()); mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); } } @@ -1205,10 +1256,7 @@ void CentralDB::commitThread() "VALUES ($1, $2, $3, $4, $5) " "ON CONFLICT (id) DO UPDATE SET " "name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1", - id, - OSUtils::jsonString(config["name"], ""), - OSUtils::jsonDump(config, -1), - _myAddressStr, + id, OSUtils::jsonString(config["name"], ""), OSUtils::jsonDump(config, -1), _myAddressStr, ((uint64_t)config["revision"])); w.commit(); @@ -1224,9 +1272,7 @@ void CentralDB::commitThread() res = w.exec_params0( "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " "VALUES ($1, $2, $3)", - id, - start, - end); + id, start, end); } const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); @@ -1239,7 +1285,9 @@ void CentralDB::commitThread() _networkChanged(nwOrig, nwNew, qitem.second); } else { - fprintf(stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt); + fprintf( + stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), + (unsigned long long)nwidInt); } } catch (std::exception& e) { @@ -1319,7 +1367,9 @@ void CentralDB::commitThread() std::string memberId = config["id"]; std::string networkId = config["nwid"]; - pqxx::result res = w.exec_params0("DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2", memberId, networkId); + pqxx::result res = w.exec_params0( + "DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2", memberId, + networkId); w.commit(); @@ -1386,56 +1436,22 @@ void CentralDB::notifyNewMember(const std::string& networkID, const std::string& void CentralDB::onlineNotificationThread() { waitForReady(); - if (_redisMemberStatus) { - onlineNotification_Redis(); - } - else { - onlineNotification_Postgres(); - } -} - -/** - * ONLY UNCOMMENT FOR TEMPORARY DB MAINTENANCE - * - * This define temporarily turns off writing to the member status table - * so it can be reindexed when the indexes get too large. - */ - -// #define DISABLE_MEMBER_STATUS 1 - -void CentralDB::onlineNotification_Postgres() -{ - _connected = 1; - - nlohmann::json jtmp1, jtmp2; while (_run == 1) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("CentralDB"); - auto span = tracer->StartSpan("CentralDB::onlineNotification_Postgres"); + auto span = tracer->StartSpan("CentralDB::onlineNotificationThread"); auto scope = tracer->WithActiveSpan(span); - auto c = _pool->borrow(); - auto c2 = _pool->borrow(); try { - fprintf(stderr, "%s onlineNotification_Postgres\n", _myAddressStr.c_str()); std::unordered_map, NodeOnlineRecord, _PairHasher> lastOnline; { std::lock_guard l(_lastOnline_l); lastOnline.swap(_lastOnline); } -#ifndef DISABLE_MEMBER_STATUS + uint64_t updateCount = 0; + auto c = _pool->borrow(); pqxx::work w(*c->c); - pqxx::work w2(*c2->c); - - fprintf(stderr, "online notification tick\n"); - - bool firstRun = true; - bool memberAdded = false; - int updateCount = 0; - - pqxx::pipeline pipe(w); - for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) { updateCount += 1; uint64_t nwid_i = i->first.first; @@ -1444,6 +1460,7 @@ void CentralDB::onlineNotification_Postgres() char ipTmp[64]; OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", i->first.second); + nlohmann::json jtmp1, jtmp2; if (! get(nwid_i, jtmp1, i->first.second, jtmp2)) { continue; // skip non existent networks/members @@ -1453,7 +1470,8 @@ void CentralDB::onlineNotification_Postgres() std::string memberId(memTmp); try { - pqxx::row r = w2.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId); + pqxx::row r = w.exec_params1( + "SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId); } catch (pqxx::unexpected_rows& e) { continue; @@ -1463,160 +1481,24 @@ void CentralDB::onlineNotification_Postgres() std::string ipAddr = i->second.physicalAddress.toIpString(ipTmp); std::string timestamp = std::to_string(ts); std::string osArch = i->second.osArch; - - std::stringstream memberUpdate; - memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES " - << "('" << networkId << "', '" << memberId << "', "; - if (ipAddr.empty()) { - memberUpdate << "NULL, "; + std::vector osArchSplit = split(osArch, '/'); + std::string os = "unknown"; + std::string arch = "unknown"; + if (osArchSplit.size() == 2) { + os = osArchSplit[0]; + arch = osArchSplit[1]; } - else { - memberUpdate << "'" << ipAddr << "', "; - } - memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000)) " - << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated"; - pipe.insert(memberUpdate.str()); - Metrics::pgsql_node_checkin++; + _statusWriter->updateNodeStatus(networkId, memberId, os, arch, "", i->second.physicalAddress, ts); } - while (! pipe.empty()) { - pipe.retrieve(); - } - - pipe.complete(); + _statusWriter->writePending(); w.commit(); - fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount); -#endif + _pool->unborrow(c); } catch (std::exception& e) { fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); } - _pool->unborrow(c2); - _pool->unborrow(c); - - ConnectionPoolStats stats = _pool->get_stats(); - fprintf(stderr, "%s pool stats: in use size: %llu, available size: %llu, total: %llu\n", _myAddressStr.c_str(), stats.borrowed_size, stats.pool_size, (stats.borrowed_size + stats.pool_size)); - - span->End(); - - std::this_thread::sleep_for(std::chrono::seconds(10)); } - fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); - if (_run == 1) { - fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(6); - } -} - -void CentralDB::onlineNotification_Redis() -{ - _connected = 1; - - char buf[11] = { 0 }; - std::string controllerId = std::string(_myAddress.toString(buf)); - - while (_run == 1) { - auto provider = opentelemetry::trace::Provider::GetTracerProvider(); - auto tracer = provider->GetTracer("CentralDB"); - auto span = tracer->StartSpan("CentralDB::onlineNotification_Redis"); - auto scope = tracer->WithActiveSpan(span); - - fprintf(stderr, "onlineNotification tick\n"); - auto start = std::chrono::high_resolution_clock::now(); - uint64_t count = 0; - - std::unordered_map, NodeOnlineRecord, _PairHasher> lastOnline; - { - std::lock_guard l(_lastOnline_l); - lastOnline.swap(_lastOnline); - } - try { - if (! lastOnline.empty()) { - if (_rc->clusterMode) { - auto tx = _cluster->transaction(controllerId, true, false); - count = _doRedisUpdate(tx, controllerId, lastOnline); - } - else { - auto tx = _redis->transaction(true, false); - count = _doRedisUpdate(tx, controllerId, lastOnline); - } - } - } - catch (sw::redis::Error& e) { - fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); - } - - auto end = std::chrono::high_resolution_clock::now(); - auto dur = std::chrono::duration_cast(end - start); - auto total = dur.count(); - - fprintf(stderr, "onlineNotification ran in %llu ms\n", total); - span->End(); - - std::this_thread::sleep_for(std::chrono::seconds(5)); - } -} - -uint64_t CentralDB::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controllerId, std::unordered_map, NodeOnlineRecord, _PairHasher>& lastOnline) -{ - auto provider = opentelemetry::trace::Provider::GetTracerProvider(); - auto tracer = provider->GetTracer("CentralDB"); - auto span = tracer->StartSpan("CentralDB::_doRedisUpdate"); - auto scope = tracer->WithActiveSpan(span); - - nlohmann::json jtmp1, jtmp2; - uint64_t count = 0; - for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) { - uint64_t nwid_i = i->first.first; - uint64_t memberid_i = i->first.second; - char nwidTmp[64]; - char memTmp[64]; - char ipTmp[64]; - OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", memberid_i); - - if (! get(nwid_i, jtmp1, memberid_i, jtmp2)) { - continue; // skip non existent members/networks - } - - std::string networkId(nwidTmp); - std::string memberId(memTmp); - - int64_t ts = i->second.lastSeen; - std::string ipAddr = i->second.physicalAddress.toIpString(ipTmp); - std::string timestamp = std::to_string(ts); - std::string osArch = i->second.osArch; - - std::unordered_map record = { { "id", memberId }, { "address", ipAddr }, { "last_updated", std::to_string(ts) } }; - tx.zadd("nodes-online:{" + controllerId + "}", memberId, ts) - .zadd("nodes-online2:{" + controllerId + "}", networkId + "-" + memberId, ts) - .zadd("network-nodes-online:{" + controllerId + "}:" + networkId, memberId, ts) - .zadd("active-networks:{" + controllerId + "}", networkId, ts) - .sadd("network-nodes-all:{" + controllerId + "}:" + networkId, memberId) - .hmset("member:{" + controllerId + "}:" + networkId + ":" + memberId, record.begin(), record.end()); - ++count; - Metrics::redis_node_checkin++; - } - - // expire records from all-nodes and network-nodes member list - uint64_t expireOld = OSUtils::now() - 300000; - - tx.zremrangebyscore("nodes-online:{" + controllerId + "}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("nodes-online2:{" + controllerId + "}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("active-networks:{" + controllerId + "}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - { - std::shared_lock l(_networks_l); - for (const auto& it : _networks) { - uint64_t nwid_i = it.first; - char nwidTmp[64]; - OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - tx.zremrangebyscore("network-nodes-online:{" + controllerId + "}:" + nwidTmp, sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - } - } - tx.exec(); - fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count); - - return count; } #endif // ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/CentralDB.hpp b/controller/CentralDB.hpp index 156e82e9a..6a3755952 100644 --- a/controller/CentralDB.hpp +++ b/controller/CentralDB.hpp @@ -10,6 +10,7 @@ #include "DB.hpp" #include "NotificationListener.hpp" #include "PostgreSQL.hpp" +#include "StatusWriter.hpp" #include #include @@ -24,12 +25,14 @@ namespace ZeroTier { struct RedisConfig; struct PubSubConfig; struct PostgresNotifyConfig; +struct BigTableConfig; struct ControllerConfig { bool ssoEnabled; RedisConfig* redisConfig; PubSubConfig* pubSubConfig; PostgresNotifyConfig* postgresNotifyConfig; + BigTableConfig* bigTableConfig; }; class CentralDB : public DB { @@ -40,7 +43,19 @@ class CentralDB : public DB { LISTENER_MODE_PUBSUB = 2, }; - CentralDB(const Identity& myId, const char* path, int listenPort, CentralDB::ListenerMode mode, ControllerConfig* cc); + enum StatusWriterMode { + STATUS_WRITER_MODE_PGSQL = 0, + STATUS_WRITER_MODE_REDIS = 1, + STATUS_WRITER_MODE_BIGTABLE = 2, + }; + + CentralDB( + const Identity& myId, + const char* path, + int listenPort, + CentralDB::ListenerMode mode, + CentralDB::StatusWriterMode statusMode, + ControllerConfig* cc); virtual ~CentralDB(); virtual bool waitForReady(); @@ -49,7 +64,11 @@ class CentralDB : public DB { virtual void eraseNetwork(const uint64_t networkId); virtual void eraseMember(const uint64_t networkId, const uint64_t memberId); virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress); - virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch); + virtual void nodeIsOnline( + const uint64_t networkId, + const uint64_t memberId, + const InetAddress& physicalAddress, + const char* osArch); virtual AuthInfo getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL); virtual bool ready() @@ -82,9 +101,6 @@ class CentralDB : public DB { void commitThread(); void onlineNotificationThread(); - void onlineNotification_Postgres(); - void onlineNotification_Redis(); - uint64_t _doRedisUpdate(sw::redis::Transaction& tx, std::string& controllerId, std::unordered_map, NodeOnlineRecord, _PairHasher>& lastOnline); void configureSmee(); void notifyNewMember(const std::string& networkID, const std::string& memberID); @@ -92,6 +108,7 @@ class CentralDB : public DB { enum OverrideMode { ALLOW_PGBOUNCER_OVERRIDE = 0, NO_OVERRIDE = 1 }; ListenerMode _listenerMode; + StatusWriterMode _statusWriterMode; ControllerConfig* _controllerConfig; std::shared_ptr > _pool; @@ -105,6 +122,7 @@ class CentralDB : public DB { std::thread _heartbeatThread; std::shared_ptr _membersDbWatcher; std::shared_ptr _networksDbWatcher; + std::shared_ptr _statusWriter; std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS]; std::thread _onlineNotificationThread; diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index af96a40fd..fd1fd951d 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -61,7 +61,9 @@ class PostgresConnFactory : public ConnectionFactory { template class MemberNotificationReceiver : public pqxx::notification_receiver { public: - MemberNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p) + MemberNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) + : pqxx::notification_receiver(c, channel) + , _psql(p) { fprintf(stderr, "initialize MemberNotificationReceiver\n"); } @@ -118,7 +120,9 @@ template class MemberNotificationReceiver : public pqxx::notificati template class NetworkNotificationReceiver : public pqxx::notification_receiver { public: - NetworkNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p) + NetworkNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) + : pqxx::notification_receiver(c, channel) + , _psql(p) { fprintf(stderr, "initialize NetworkrNotificationReceiver\n"); } @@ -187,6 +191,7 @@ struct NodeOnlineRecord { uint64_t lastSeen; InetAddress physicalAddress; std::string osArch; + std::string version; }; /** @@ -194,7 +199,9 @@ struct NodeOnlineRecord { */ template class _notificationReceiver : public pqxx::notification_receiver { public: - _notificationReceiver(T* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _listener(p) + _notificationReceiver(T* p, pqxx::connection& c, const std::string& channel) + : pqxx::notification_receiver(c, channel) + , _listener(p) { fprintf(stderr, "initialize PostgresMemberNotificationListener::_notificationReceiver\n"); } @@ -214,7 +221,11 @@ template class _notificationReceiver : public pqxx::notification_re class PostgresMemberListener : public NotificationListener { public: - PostgresMemberListener(DB* db, std::shared_ptr > pool, const std::string& channel, uint64_t timeout); + PostgresMemberListener( + DB* db, + std::shared_ptr > pool, + const std::string& channel, + uint64_t timeout); virtual ~PostgresMemberListener(); virtual void listen(); @@ -233,7 +244,11 @@ class PostgresMemberListener : public NotificationListener { class PostgresNetworkListener : public NotificationListener { public: - PostgresNetworkListener(DB* db, std::shared_ptr > pool, const std::string& channel, uint64_t timeout); + PostgresNetworkListener( + DB* db, + std::shared_ptr > pool, + const std::string& channel, + uint64_t timeout); virtual ~PostgresNetworkListener(); virtual void listen();