diff --git a/ext/central-controller-docker/main-new.sh b/ext/central-controller-docker/main-new.sh index f40933ef2..aa0dba0fa 100755 --- a/ext/central-controller-docker/main-new.sh +++ b/ext/central-controller-docker/main-new.sh @@ -80,9 +80,9 @@ if [ "$ZT_USE_BIGTABLE" == "true" ]; then fi BIGTABLE_CONF=", \"bigtable\": { - \"project\": \"${ZT_BIGTABLE_PROJECT}\", - \"instance\": \"${ZT_BIGTABLE_INSTANCE}\", - \"table\": \"${ZT_BIGTABLE_TABLE}\" + \"project_id\": \"${ZT_BIGTABLE_PROJECT}\", + \"instance_id\": \"${ZT_BIGTABLE_INSTANCE}\", + \"table_id\": \"${ZT_BIGTABLE_TABLE}\" } " fi @@ -96,7 +96,7 @@ if [ "$ZT_USE_PUBSUB" == "true" ]; then fi PUBSUB_CONF=", \"pubsub\": { - \"project\": \"${ZT_CTL_PUBSUB_PROJECT}\" + \"project_id\": \"${ZT_PUBSUB_PROJECT}\" } " fi @@ -161,6 +161,10 @@ if [ -n "$ZT_TEMPORAL_HOST" ] && [ -n "$ZT_TEMPORAL_PORT" ]; then echo "Temporal is up" fi +cat /var/lib/zerotier-one/local.conf + +export GOOGLE_CLOUD_CPP_ENABLE_CLOG=yes +export LIBC_FATAL_STDERR_=1 export GLIBCXX_FORCE_NEW=1 export GLIBCPP_FORCE_NEW=1 export LD_PRELOAD="/opt/conda/envs/central_controller/lib/libjemalloc.so.2" diff --git a/nonfree/controller/BigTableStatusWriter.cpp b/nonfree/controller/BigTableStatusWriter.cpp index 61f75f324..8c525cc68 100644 --- a/nonfree/controller/BigTableStatusWriter.cpp +++ b/nonfree/controller/BigTableStatusWriter.cpp @@ -31,12 +31,22 @@ BigTableStatusWriter::BigTableStatusWriter( , _instance_id(instance_id) , _table_id(table_id) , _pubsubWriter(pubsubWriter) + , _table(nullptr) { + _table = new cbt::Table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id)); + fprintf( + stderr, "BigTableStatusWriter for project %s instance %s table %s\n", project_id.c_str(), instance_id.c_str(), + table_id.c_str()); } BigTableStatusWriter::~BigTableStatusWriter() { writePending(); + + if (_table != nullptr) { + delete _table; + _table = nullptr; + } } void BigTableStatusWriter::updateNodeStatus( @@ -79,11 +89,8 @@ void BigTableStatusWriter::writePending() if (toWrite.empty()) { return; } + fprintf(stderr, "Writing %zu pending status entries to BigTable\n", toWrite.size()); - namespace cbt = google::cloud::bigtable; - cbt::Table table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id)); - - cbt::BulkMutation bulk; for (const auto& entry : toWrite) { std::string row_key = entry.network_id + "#" + entry.node_id; cbt::SingleRowMutation m(row_key); @@ -91,26 +98,74 @@ void BigTableStatusWriter::writePending() m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, archColumn, entry.arch)); m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, versionColumn, entry.version)); char buf[64] = { 0 }; + std::string addressStr = entry.address.toString(buf); if (entry.address.ss_family == AF_INET) { - m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, entry.address.toString(buf))); + m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, std::move(addressStr))); } else if (entry.address.ss_family == AF_INET6) { - m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, entry.address.toString(buf))); + m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, std::move(addressStr))); } int64_t ts = entry.last_seen; m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts))); - bulk.push_back(std::move(m)); - // TODO: Check performance on this. May need to bach these. - _pubsubWriter->publishStatusChange( - entry.target, entry.network_id, entry.node_id, entry.os, entry.arch, entry.version, entry.last_seen); + try { + auto status = _table->Apply(std::move(m)); + if (! status.ok()) { + fprintf(stderr, "Error writing to BigTable: %s\n", status.message().c_str()); + } + else { + _pubsubWriter->publishStatusChange( + entry.target, entry.network_id, entry.node_id, entry.os, entry.arch, entry.version, + entry.last_seen); + } + } + catch (const std::exception& e) { + fprintf(stderr, "Exception writing to BigTable: %s\n", e.what()); + span->SetAttribute("error", e.what()); + span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + return; + } } + // cbt::BulkMutation bulk; + // for (const auto& entry : toWrite) { + // std::string row_key = entry.network_id + "#" + entry.node_id; + // cbt::SingleRowMutation m(row_key); + // m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, osColumn, entry.os)); + // m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, archColumn, entry.arch)); + // m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, versionColumn, entry.version)); + // char buf[64] = { 0 }; + // std::string addressStr = entry.address.toString(buf); + // if (entry.address.ss_family == AF_INET) { + // m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, std::move(addressStr))); + // } + // else if (entry.address.ss_family == AF_INET6) { + // m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, std::move(addressStr))); + // } + // int64_t ts = entry.last_seen; + // m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts))); + // bulk.emplace_back(m); - std::vector failures = table.BulkApply(bulk); - for (auto const& r : failures) { - // Handle error (log it, retry, etc.) - std::cerr << "Error writing to BigTable: " << r.status() << "\n"; - } + // // TODO: Check performance on this. May need to bach these. + // _pubsubWriter->publishStatusChange( + // entry.target, entry.network_id, entry.node_id, entry.os, entry.arch, entry.version, entry.last_seen); + // } + + // fprintf(stderr, "Applying %zu mutations to BigTable\n", bulk.size()); + + // try { + // std::vector failures = table.BulkApply(std::move(bulk)); + // fprintf(stderr, "BigTable write completed with %zu failures\n", failures.size()); + // for (auto const& r : failures) { + // // Handle error (log it, retry, etc.) + // std::cerr << "Error writing to BigTable: " << r.status() << "\n"; + // } + // } + // catch (const std::exception& e) { + // fprintf(stderr, "Exception writing to BigTable: %s\n", e.what()); + // span->SetAttribute("error", e.what()); + // span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + // return; + // } } } // namespace ZeroTier \ No newline at end of file diff --git a/nonfree/controller/BigTableStatusWriter.hpp b/nonfree/controller/BigTableStatusWriter.hpp index 00ab0647f..750b46685 100644 --- a/nonfree/controller/BigTableStatusWriter.hpp +++ b/nonfree/controller/BigTableStatusWriter.hpp @@ -3,6 +3,7 @@ #include "StatusWriter.hpp" +#include #include #include #include @@ -40,6 +41,7 @@ class BigTableStatusWriter : public StatusWriter { mutable std::mutex _lock; std::vector _pending; std::shared_ptr _pubsubWriter; + google::cloud::bigtable::Table* _table; }; } // namespace ZeroTier diff --git a/nonfree/controller/CentralDB.cpp b/nonfree/controller/CentralDB.cpp index a351b42ba..668e025eb 100644 --- a/nonfree/controller/CentralDB.cpp +++ b/nonfree/controller/CentralDB.cpp @@ -147,6 +147,7 @@ CentralDB::CentralDB( switch (listenMode) { case LISTENER_MODE_REDIS: + fprintf(stderr, "Using Redis for change listeners\n"); if (_cc->redisConfig != NULL) { if (_cc->redisConfig->clusterMode) { _membersDbWatcher = std::make_shared(_myAddressStr, _cluster, this); @@ -161,6 +162,7 @@ CentralDB::CentralDB( throw std::runtime_error("CentralDB: Redis listener mode selected but no Redis configuration provided"); } case LISTENER_MODE_PUBSUB: + fprintf(stderr, "Using PubSub for change listeners\n"); if (cc->pubSubConfig != NULL) { _membersDbWatcher = std::make_shared(_myAddressStr, cc->pubSubConfig->project_id, this); @@ -174,6 +176,7 @@ CentralDB::CentralDB( break; case LISTENER_MODE_PGSQL: default: + fprintf(stderr, "Using PostgreSQL for change listeners\n"); _membersDbWatcher = std::make_shared(this, _pool, "member_" + _myAddressStr, 5); _networksDbWatcher = std::make_shared(this, _pool, "network_" + _myAddressStr, 5); break; @@ -182,6 +185,7 @@ CentralDB::CentralDB( std::shared_ptr pubsubWriter; switch (statusMode) { case STATUS_WRITER_MODE_REDIS: + fprintf(stderr, "Using Redis for status writer\n"); if (_cc->redisConfig != NULL) { if (_cc->redisConfig->clusterMode) { _statusWriter = std::make_shared(_cluster, _myAddressStr); @@ -195,6 +199,7 @@ CentralDB::CentralDB( } break; case STATUS_WRITER_MODE_BIGTABLE: + fprintf(stderr, "Using BigTable for status writer\n"); if (cc->bigTableConfig == NULL) { throw std::runtime_error( "CentralDB: BigTable status mode selected but no BigTable configuration provided"); @@ -213,6 +218,7 @@ CentralDB::CentralDB( break; case STATUS_WRITER_MODE_PGSQL: default: + fprintf(stderr, "Using PostgreSQL for status writer\n"); _statusWriter = std::make_shared(_pool); break; } @@ -823,7 +829,7 @@ void CentralDB::initializeMembers() "(EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, " "(EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, " "(EXTRACT(EPOCH FROM nm.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, " - "nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags " + "nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags, " "nm.frontend " "FROM network_memberships_ctl nm " "INNER JOIN networks_ctl n " @@ -1492,6 +1498,7 @@ void CentralDB::onlineNotificationThread() _statusWriter->updateNodeStatus( networkId, memberId, os, arch, "", i->second.physicalAddress, ts, frontend); + fprintf(stderr, "sent node status update\n"); } _statusWriter->writePending(); w.commit(); diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index f4f14db99..c6af9534e 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -30,11 +30,14 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s : _controller_id(controller_id) , _project(project) , _topic(topic) - , _subscription_id("sub-" + controller_id + "-network-changes") + , _subscription_id("sub-" + controller_id + "-" + topic) , _run(false) , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection()) , _subscription(pubsub::Subscription(_project, _subscription_id)) { + fprintf( + stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(), + project.c_str(), topic.c_str(), _subscription_id.c_str()); GOOGLE_PROTOBUF_VERIFY_VERSION; // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist diff --git a/nonfree/controller/PubSubWriter.cpp b/nonfree/controller/PubSubWriter.cpp index 7eed8100c..25698f363 100644 --- a/nonfree/controller/PubSubWriter.cpp +++ b/nonfree/controller/PubSubWriter.cpp @@ -15,11 +15,14 @@ namespace pubsub = ::google::cloud::pubsub; namespace ZeroTier { -PubSubWriter::PubSubWriter(std::string controller_id, std::string project, std::string topic) +PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string controller_id) : _controller_id(controller_id) , _project(project) , _topic(topic) { + fprintf( + stderr, "PubSubWriter for controller %s project %s topic %s\n", controller_id.c_str(), project.c_str(), + topic.c_str()); GOOGLE_PROTOBUF_VERIFY_VERSION; // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist diff --git a/nonfree/controller/PubSubWriter.hpp b/nonfree/controller/PubSubWriter.hpp index 10265a851..170403d39 100644 --- a/nonfree/controller/PubSubWriter.hpp +++ b/nonfree/controller/PubSubWriter.hpp @@ -10,7 +10,7 @@ namespace ZeroTier { class PubSubWriter { public: - PubSubWriter(std::string controller_id, std::string project, std::string topic); + PubSubWriter(std::string project, std::string topic, std::string controller_id); virtual ~PubSubWriter(); bool publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend = ""); diff --git a/tmp/local.conf b/tmp/local.conf new file mode 100644 index 000000000..9555b24d3 --- /dev/null +++ b/tmp/local.conf @@ -0,0 +1,33 @@ +{ + "settings": { + "controllerDbPath": "postgres:host=postgres port=5432 dbname=central user=testuser password=test_password application_name=controller-203e4472cf sslmode=prefer sslcert= sslkey= sslrootcert=", + "portMappingEnabled": true, + "softwareUpdate": "disable", + "interfacePrefixBlacklist": [ + "inot", + "nat64" + ], + "lowBandwidthMode": false, + "ssoRedirectURL": "", + "allowManagementFrom": ["127.0.0.1", "::1", "10.0.0.0/8"], + "otel": { + "exporterEndpoint": "jaeger:4317", + "exporterSampleRate": 1 + } + , "redis": null + }, + "controller": { + "listenMode": "pubsub", + "statusMode": "bigtable" + , "redis": null + , "bigtable": { + "project_id": "arbitrary-project", + "instance_id": "arbitrary-instance", + "table_id": "member_status" + } + + , "pubsub": { + "project_id": "arbitrary-project" + } + } +}