/* * Copyright (c)2019 ZeroTier, Inc. * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file in the project's root directory. * * Change Date: 2026-01-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2.0 of the Apache License. */ /****/ #include "DBMirrorSet.hpp" #include "opentelemetry/trace/provider.h" namespace ZeroTier { DBMirrorSet::DBMirrorSet(DB::ChangeListener* listener) : _listener(listener), _running(true), _syncCheckerThread(), _dbs(), _dbs_l() { _syncCheckerThread = std::thread([this]() { for (;;) { for (int i = 0; i < 120; ++i) { // 1 minute delay between checks if (! _running) return; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db::syncChecker"); auto scope = tracer->WithActiveSpan(span); std::vector > dbs; { std::unique_lock l(_dbs_l); if (_dbs.size() <= 1) continue; // no need to do this if there's only one DB, so skip the iteration dbs = _dbs; } for (auto db = dbs.begin(); db != dbs.end(); ++db) { (*db)->each([&dbs, &db](uint64_t networkId, const nlohmann::json& network, uint64_t memberId, const nlohmann::json& member) { try { if (network.is_object()) { if (memberId == 0) { for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) { if (db->get() != db2->get()) { nlohmann::json nw2; if ((! (*db2)->get(networkId, nw2)) || ((nw2.is_object()) && (OSUtils::jsonInt(nw2["revision"], 0) < OSUtils::jsonInt(network["revision"], 0)))) { nw2 = network; (*db2)->save(nw2, false); } } } } else if (member.is_object()) { for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) { if (db->get() != db2->get()) { nlohmann::json nw2, m2; if ((! (*db2)->get(networkId, nw2, memberId, m2)) || ((m2.is_object()) && (OSUtils::jsonInt(m2["revision"], 0) < OSUtils::jsonInt(member["revision"], 0)))) { m2 = member; (*db2)->save(m2, false); } } } } } } catch (...) { } // skip entries that generate JSON errors }); } } }); } DBMirrorSet::~DBMirrorSet() { _running = false; _syncCheckerThread.join(); } bool DBMirrorSet::hasNetwork(const uint64_t networkId) const { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::hasNetwork"); auto scope = tracer->WithActiveSpan(span); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if ((*d)->hasNetwork(networkId)) return true; } return false; } bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::getNetwork"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if ((*d)->get(networkId, network)) { return true; } } return false; } bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMember"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; char memberIdStr[11]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr)); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if ((*d)->get(networkId, network, memberId, member)) return true; } return false; } bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member, DB::NetworkSummaryInfo& info) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMemberWithSummary"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; char memberIdStr[11]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr)); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if ((*d)->get(networkId, network, memberId, member, info)) return true; } return false; } bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, std::vector& members) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMembers"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if ((*d)->get(networkId, network, members)) return true; } return false; } AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::getSSOAuthInfo"); auto scope = tracer->WithActiveSpan(span); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { AuthInfo info = (*d)->getSSOAuthInfo(member, redirectURL); if (info.enabled) { return info; } } return AuthInfo(); } void DBMirrorSet::networks(std::set& networks) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::networks"); auto scope = tracer->WithActiveSpan(span); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { (*d)->networks(networks); } } bool DBMirrorSet::waitForReady() { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::waitForReady"); auto scope = tracer->WithActiveSpan(span); bool r = false; std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { r |= (*d)->waitForReady(); } return r; } bool DBMirrorSet::isReady() { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::isReady"); auto scope = tracer->WithActiveSpan(span); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if (! (*d)->isReady()) return false; } return true; } bool DBMirrorSet::save(nlohmann::json& record, bool notifyListeners) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::save"); auto scope = tracer->WithActiveSpan(span); std::vector > dbs; { std::unique_lock l(_dbs_l); dbs = _dbs; } if (notifyListeners) { for (auto d = dbs.begin(); d != dbs.end(); ++d) { if ((*d)->save(record, true)) return true; } return false; } else { bool modified = false; for (auto d = dbs.begin(); d != dbs.end(); ++d) { modified |= (*d)->save(record, false); } return modified; } } void DBMirrorSet::eraseNetwork(const uint64_t networkId) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::eraseNetwork"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); std::unique_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { (*d)->eraseNetwork(networkId); } } void DBMirrorSet::eraseMember(const uint64_t networkId, const uint64_t memberId) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::eraseMember"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; char memberIdStr[11]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr)); std::unique_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { (*d)->eraseMember(networkId, memberId); } } void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::nodeIsOnline"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; char memberIdStr[11]; char phyAddressStr[INET6_ADDRSTRLEN]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr)); span->SetAttribute("physical_address", physicalAddress.toString(phyAddressStr)); span->SetAttribute("os_arch", osArch); std::shared_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { (*d)->nodeIsOnline(networkId, memberId, physicalAddress, osArch); } } void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress) { this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown"); } void DBMirrorSet::onNetworkUpdate(const void* db, uint64_t networkId, const nlohmann::json& network) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::onNetworkUpdate"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); nlohmann::json record(network); std::unique_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if (d->get() != db) { (*d)->save(record, false); } } _listener->onNetworkUpdate(this, networkId, network); } void DBMirrorSet::onNetworkMemberUpdate(const void* db, uint64_t networkId, uint64_t memberId, const nlohmann::json& member) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberUpdate"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; char memberIdStr[11]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr)); nlohmann::json record(member); std::unique_lock l(_dbs_l); for (auto d = _dbs.begin(); d != _dbs.end(); ++d) { if (d->get() != db) { (*d)->save(record, false); } } _listener->onNetworkMemberUpdate(this, networkId, memberId, member); } void DBMirrorSet::onNetworkMemberDeauthorize(const void* db, uint64_t networkId, uint64_t memberId) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("db_mirror_set"); auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberDeauthorize"); auto scope = tracer->WithActiveSpan(span); char networkIdStr[17]; char memberIdStr[11]; span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr)); span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr)); _listener->onNetworkMemberDeauthorize(this, networkId, memberId); } } // namespace ZeroTier