diff --git a/controller/BigTableStatusWriter.cpp b/controller/BigTableStatusWriter.cpp new file mode 100644 index 000000000..4406b2884 --- /dev/null +++ b/controller/BigTableStatusWriter.cpp @@ -0,0 +1,98 @@ +#include "BigTableStatusWriter.hpp" + +#include +#include +#include + +namespace cbt = google::cloud::bigtable; + +namespace ZeroTier { + +const std::string nodeInfoColumnFamily = "node_info"; +const std::string checkInColumnFamily = "check_in"; + +const std::string osColumn = "os"; +const std::string archColumn = "arch"; +const std::string versionColumn = "version"; +const std::string ipv4Column = "ipv4"; +const std::string ipv6Column = "ipv6"; +const std::string lastSeenColumn = "last_seen"; + +BigTableStatusWriter::BigTableStatusWriter( + const std::string& project_id, + const std::string& instance_id, + const std::string& table_id) + : _project_id(project_id) + , _instance_id(instance_id) + , _table_id(table_id) +{ +} + +BigTableStatusWriter::~BigTableStatusWriter() +{ + writePending(); +} + +void BigTableStatusWriter::updateNodeStatus( + const std::string& network_id, + const std::string& node_id, + const std::string& os, + const std::string& arch, + const std::string& version, + const InetAddress& address, + int64_t last_seen) +{ + std::lock_guard l(_lock); + _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen }); + if (_pending.size() >= 100) { + writePending(); + } +} + +size_t BigTableStatusWriter::queueLength() const +{ + std::lock_guard l(_lock); + return _pending.size(); +} + +void BigTableStatusWriter::writePending() +{ + std::vector toWrite; + { + std::lock_guard l(_lock); + toWrite.swap(_pending); + } + if (toWrite.empty()) { + return; + } + + namespace cbt = google::cloud::bigtable; + cbt::Table table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id)); + + cbt::BulkMutation bulk; + for (const auto& entry : toWrite) { + std::string row_key = entry.network_id + "#" + entry.node_id; + cbt::SingleRowMutation m(row_key); + m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, osColumn, entry.os)); + m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, archColumn, entry.arch)); + m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, versionColumn, entry.version)); + char buf[64] = { 0 }; + if (entry.address.ss_family == AF_INET) { + m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, entry.address.toString(buf))); + } + else if (entry.address.ss_family == AF_INET6) { + m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, entry.address.toString(buf))); + } + int64_t ts = entry.last_seen; + m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts))); + bulk.push_back(std::move(m)); + } + + std::vector failures = table.BulkApply(bulk); + for (auto const& r : failures) { + // Handle error (log it, retry, etc.) + std::cerr << "Error writing to BigTable: " << r.status() << "\n"; + } +} + +} // namespace ZeroTier \ No newline at end of file diff --git a/controller/BigTableStatusWriter.hpp b/controller/BigTableStatusWriter.hpp new file mode 100644 index 000000000..85c1e0b4d --- /dev/null +++ b/controller/BigTableStatusWriter.hpp @@ -0,0 +1,37 @@ +#ifndef BIGTABLESTATUSWRITER_HPP +#define BIGTABLESTATUSWRITER_HPP + +#include "StatusWriter.hpp" + +#include +#include + +namespace ZeroTier { +class BigTableStatusWriter : public StatusWriter { + public: + BigTableStatusWriter(const std::string& project_id, const std::string& instance_id, const std::string& table_id); + virtual ~BigTableStatusWriter(); + + virtual void updateNodeStatus( + const std::string& network_id, + const std::string& node_id, + const std::string& os, + const std::string& arch, + const std::string& version, + const InetAddress& address, + int64_t last_seen) override; + virtual size_t queueLength() const override; + virtual void writePending() override; + + private: + const std::string _project_id; + const std::string _instance_id; + const std::string _table_id; + + mutable std::mutex _lock; + std::vector _pending; +}; + +} // namespace ZeroTier + +#endif \ No newline at end of file diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt index 482956606..ea11b3952 100644 --- a/controller/CMakeLists.txt +++ b/controller/CMakeLists.txt @@ -49,18 +49,30 @@ if (ZT1_CENTRAL_CONTROLLER) PubSubListener.hpp Redis.hpp RedisListener.cpp - RedisListener.hpp) + RedisListener.hpp + StatusWriter.cpp + StatusWriter.hpp + BigTableStatusWriter.cpp + BigTableStatusWriter.hpp + PostgresStatusWriter.cpp + PostgresStatusWriter.hpp + RedisStatusWriter.cpp + RedisStatusWriter.hpp + ) list(APPEND INCLUDE_DIRS ${PostgreSQL_INCLUDE_DIRS} "${redis++_BUILD_DIR}/src" ${pqxx_INCLUDE_DIRS} + ) list(APPEND LINK_LIBS redis++::redis++_static pqxx ${PostgreSQL_LIBRARIES} + google-cloud-cpp::bigtable + google-cloud-cpp::pubsub ) endif() diff --git a/controller/PostgresStatusWriter.cpp b/controller/PostgresStatusWriter.cpp new file mode 100644 index 000000000..3e5902845 --- /dev/null +++ b/controller/PostgresStatusWriter.cpp @@ -0,0 +1,99 @@ +#include "PostgresStatusWriter.hpp" + +#include "../node/Metrics.hpp" + +#include +#include + +namespace ZeroTier { + +PostgresStatusWriter::PostgresStatusWriter(std::shared_ptr > pool) : _pool(pool) +{ +} + +PostgresStatusWriter::~PostgresStatusWriter() +{ + writePending(); +} + +void PostgresStatusWriter::updateNodeStatus( + const std::string& network_id, + const std::string& node_id, + const std::string& os, + const std::string& arch, + const std::string& version, + const InetAddress& address, + int64_t last_seen) +{ + std::lock_guard l(_lock); + _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen }); +} + +size_t PostgresStatusWriter::queueLength() const +{ + std::lock_guard l(_lock); + return _pending.size(); +} + +void PostgresStatusWriter::writePending() +{ + std::vector toWrite; + { + std::lock_guard l(_lock); + toWrite.swap(_pending); + } + if (toWrite.empty()) { + return; + } + + try { + auto conn = _pool->borrow(); + pqxx::work w(*conn->c); + + pqxx::pipeline pipe(w); + for (const auto& entry : toWrite) { + char iptmp[64] = { 0 }; + nlohmann::json record = { + { entry.address.toIpString(iptmp), entry.last_seen }, + }; + + try { + // check if the member exists first. + // + // exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned. If that's the + // case, skip this record and move on. + w.exec_params1( + "SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = $1 AND device_id = " + "$2", + entry.network_id, entry.node_id); + } + catch (pqxx::unexpected_rows& e) { + continue; + } + + std::string insert_statement = + "INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) " + "VALUES ('" + + w.esc(entry.node_id) + "', '" + w.esc(entry.network_id) + "', '" + w.esc(record.dump()) + + "'::JSONB, " + "'" + + w.esc(entry.os) + "', '" + w.esc(entry.arch) + + "') " + "ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, " + "last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen"; + + pipe.insert(insert_statement); + Metrics::pgsql_node_checkin++; + } + + pipe.complete(); + w.commit(); + _pool->unborrow(conn); + } + catch (const std::exception& e) { + // Log the error + fprintf(stderr, "Error writing to Postgres: %s\n", e.what()); + } +} + +} // namespace ZeroTier \ No newline at end of file diff --git a/controller/PostgresStatusWriter.hpp b/controller/PostgresStatusWriter.hpp new file mode 100644 index 000000000..375fcf5f7 --- /dev/null +++ b/controller/PostgresStatusWriter.hpp @@ -0,0 +1,39 @@ +#ifndef POSTGRES_STATUS_WRITER_HPP +#define POSTGRES_STATUS_WRITER_HPP + +#include "PostgreSQL.hpp" +#include "StatusWriter.hpp" + +#include +#include +#include +#include + +namespace ZeroTier { + +class PostgresStatusWriter : public StatusWriter { + public: + PostgresStatusWriter(std::shared_ptr > pool); + virtual ~PostgresStatusWriter(); + + virtual void updateNodeStatus( + const std::string& network_id, + const std::string& node_id, + const std::string& os, + const std::string& arch, + const std::string& version, + const InetAddress& address, + int64_t last_seen) override; + virtual size_t queueLength() const override; + virtual void writePending() override; + + private: + std::shared_ptr > _pool; + + mutable std::mutex _lock; + std::vector _pending; +}; + +} // namespace ZeroTier + +#endif // POSTGRES_STATUS_WRITER_HPP \ No newline at end of file diff --git a/controller/RedisStatusWriter.cpp b/controller/RedisStatusWriter.cpp new file mode 100644 index 000000000..c222d26a2 --- /dev/null +++ b/controller/RedisStatusWriter.cpp @@ -0,0 +1,122 @@ +#include "RedisStatusWriter.hpp" + +#include "../node/Metrics.hpp" +#include "../osdep/OSUtils.hpp" + +#include +#include + +namespace ZeroTier { + +RedisStatusWriter::RedisStatusWriter(std::shared_ptr redis, std::string controller_id) + : _redis(redis) + , _mode(REDIS_MODE_STANDALONE) +{ +} + +RedisStatusWriter::RedisStatusWriter(std::shared_ptr cluster, std::string controller_id) + : _cluster(cluster) + , _mode(REDIS_MODE_CLUSTER) +{ +} + +RedisStatusWriter::~RedisStatusWriter() +{ + writePending(); +} + +void RedisStatusWriter::updateNodeStatus( + const std::string& network_id, + const std::string& node_id, + const std::string& os, + const std::string& arch, + const std::string& version, + const InetAddress& address, + int64_t last_seen) +{ + std::lock_guard l(_lock); + _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen }); +} + +size_t RedisStatusWriter::queueLength() const +{ + std::lock_guard l(_lock); + return _pending.size(); +} + +void RedisStatusWriter::writePending() +{ + try { + if (_mode == REDIS_MODE_STANDALONE) { + auto tx = _redis->transaction(true, false); + _doWritePending(tx); + } + else if (_mode == REDIS_MODE_CLUSTER) { + auto tx = _cluster->transaction(_controller_id, true, false); + _doWritePending(tx); + } + } + catch (const sw::redis::Error& e) { + // Log the error + fprintf(stderr, "Error writing to Redis: %s\n", e.what()); + } +} + +void RedisStatusWriter::_doWritePending(sw::redis::Transaction& tx) +{ + std::vector toWrite; + { + std::lock_guard l(_lock); + toWrite.swap(_pending); + } + if (toWrite.empty()) { + return; + } + + std::set networksUpdated; + uint64_t updateCount = 0; + for (const auto& entry : _pending) { + char iptmp[64] = { 0 }; + std::string ipAddr = entry.address.toIpString(iptmp); + std::unordered_map record = { + { "id", entry.node_id }, { "address", ipAddr }, { "last_updated", std::to_string(entry.last_seen) }, + { "os", entry.os }, { "arch", entry.arch }, { "version", entry.version } + }; + + tx.zadd("nodes-online:{" + _controller_id + "}", entry.node_id, entry.last_seen) + .zadd("nodes-online2:{" + _controller_id + "}", entry.network_id + "-" + entry.node_id, entry.last_seen) + .zadd("network-nodes-online:{" + _controller_id + "}:" + entry.network_id, entry.node_id, entry.last_seen) + .zadd("active-networks:{" + _controller_id + "}", entry.network_id, entry.last_seen) + .sadd("network-nodes-all:{" + _controller_id + "}:" + entry.network_id, entry.node_id) + .hmset( + "member:{" + _controller_id + "}:" + entry.network_id + ":" + entry.node_id, record.begin(), + record.end()); + networksUpdated.insert(entry.network_id); + ++updateCount; + Metrics::redis_node_checkin++; + } + + // expire records from all-nodes and network-nodes member list + uint64_t expireOld = OSUtils::now() - 300000; + + tx.zremrangebyscore( + "nodes-online:{" + _controller_id + "}", + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore( + "nodes-online2:{" + _controller_id + "}", + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore( + "active-networks:{" + _controller_id + "}", + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + + for (const auto& nwid : networksUpdated) { + tx.zremrangebyscore( + "network-nodes-online:{" + _controller_id + "}:" + nwid, + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + } + + fprintf(stderr, "%s: Updated online status of %d members\n", _controller_id.c_str(), updateCount); + tx.exec(); +} + +} // namespace ZeroTier \ No newline at end of file diff --git a/controller/RedisStatusWriter.hpp b/controller/RedisStatusWriter.hpp new file mode 100644 index 000000000..ac0ffd866 --- /dev/null +++ b/controller/RedisStatusWriter.hpp @@ -0,0 +1,46 @@ +#ifndef REDIS_STATUS_WRITER_HPP +#define REDIS_STATUS_WRITER_HPP + +#include "Redis.hpp" +#include "StatusWriter.hpp" + +#include +#include +#include + +namespace ZeroTier { + +class RedisStatusWriter : public StatusWriter { + public: + RedisStatusWriter(std::shared_ptr redis, std::string controller_id); + RedisStatusWriter(std::shared_ptr cluster, std::string controller_id); + virtual ~RedisStatusWriter(); + + virtual void updateNodeStatus( + const std::string& network_id, + const std::string& node_id, + const std::string& os, + const std::string& arch, + const std::string& version, + const InetAddress& address, + int64_t last_seen) override; + virtual size_t queueLength() const override; + virtual void writePending() override; + + private: + void _doWritePending(sw::redis::Transaction& tx); + + std::string _controller_id; + + enum RedisMode { REDIS_MODE_STANDALONE, REDIS_MODE_CLUSTER }; + std::shared_ptr _redis; + std::shared_ptr _cluster; + RedisMode _mode = REDIS_MODE_STANDALONE; + + mutable std::mutex _lock; + std::vector _pending; +}; + +} // namespace ZeroTier + +#endif // REDIS_STATUS_WRITER_HPP \ No newline at end of file diff --git a/controller/StatusWriter.cpp b/controller/StatusWriter.cpp new file mode 100644 index 000000000..b61ec1251 --- /dev/null +++ b/controller/StatusWriter.cpp @@ -0,0 +1 @@ +#include "StatusWriter.hpp" diff --git a/controller/StatusWriter.hpp b/controller/StatusWriter.hpp new file mode 100644 index 000000000..37b2dd06e --- /dev/null +++ b/controller/StatusWriter.hpp @@ -0,0 +1,43 @@ +#ifndef STATUS_WRITER_HPP +#define STATUS_WRITER_HPP + +#include "../node/InetAddress.hpp" + +#include + +namespace ZeroTier { + +/** + * Abstract interface for writing status information somewhere. + * + * Implementations might write to a database, a file, or something else. + */ +class StatusWriter { + public: + virtual ~StatusWriter() = 0; + + virtual void updateNodeStatus( + const std::string& network_id, + const std::string& node_id, + const std::string& os, + const std::string& arch, + const std::string& version, + const InetAddress& address, + int64_t last_seen) = 0; + virtual size_t queueLength() const = 0; + virtual void writePending() = 0; +}; + +struct PendingStatusEntry { + std::string network_id; + std::string node_id; + std::string os; + std::string arch; + std::string version; + InetAddress address; + int64_t last_seen; +}; + +} // namespace ZeroTier + +#endif // STATUS_WRITER_HPP