diff --git a/nonfree/controller/BigTableStatusWriter.cpp b/nonfree/controller/BigTableStatusWriter.cpp index 59c36eb65..44180b65a 100644 --- a/nonfree/controller/BigTableStatusWriter.cpp +++ b/nonfree/controller/BigTableStatusWriter.cpp @@ -25,12 +25,10 @@ const std::string lastSeenColumn = "last_seen"; BigTableStatusWriter::BigTableStatusWriter( const std::string& project_id, const std::string& instance_id, - const std::string& table_id, - std::shared_ptr pubsubWriter) + const std::string& table_id) : _project_id(project_id) , _instance_id(instance_id) , _table_id(table_id) - , _pubsubWriter(pubsubWriter) , _table(nullptr) { _table = new cbt::Table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id)); diff --git a/nonfree/controller/BigTableStatusWriter.hpp b/nonfree/controller/BigTableStatusWriter.hpp index 750b46685..ce687121f 100644 --- a/nonfree/controller/BigTableStatusWriter.hpp +++ b/nonfree/controller/BigTableStatusWriter.hpp @@ -14,11 +14,7 @@ class PubSubWriter; class BigTableStatusWriter : public StatusWriter { public: - BigTableStatusWriter( - const std::string& project_id, - const std::string& instance_id, - const std::string& table_id, - std::shared_ptr pubsubWriter); + BigTableStatusWriter(const std::string& project_id, const std::string& instance_id, const std::string& table_id); virtual ~BigTableStatusWriter(); virtual void updateNodeStatus( @@ -40,7 +36,6 @@ class BigTableStatusWriter : public StatusWriter { mutable std::mutex _lock; std::vector _pending; - std::shared_ptr _pubsubWriter; google::cloud::bigtable::Table* _table; }; diff --git a/nonfree/controller/CMakeLists.txt b/nonfree/controller/CMakeLists.txt index 4593f5daf..9a5070977 100644 --- a/nonfree/controller/CMakeLists.txt +++ b/nonfree/controller/CMakeLists.txt @@ -42,6 +42,8 @@ if (ZT1_CENTRAL_CONTROLLER) CentralDB.cpp CentralDB.hpp ControllerConfig.hpp + ControllerChangeNotifier.cpp + ControllerChangeNotifier.hpp NotificationListener.hpp PostgreSQL.cpp PostgreSQL.hpp diff --git a/nonfree/controller/CentralDB.cpp b/nonfree/controller/CentralDB.cpp index 8944594b2..759c4d09e 100644 --- a/nonfree/controller/CentralDB.cpp +++ b/nonfree/controller/CentralDB.cpp @@ -19,6 +19,7 @@ #include "../../node/SHA512.hpp" #include "../../version.h" #include "BigTableStatusWriter.hpp" +#include "ControllerChangeNotifier.hpp" #include "ControllerConfig.hpp" #include "CtlUtil.hpp" #include "EmbeddedNetworkController.hpp" @@ -168,6 +169,7 @@ CentralDB::CentralDB( std::make_shared(_myAddressStr, cc->pubSubConfig->project_id, this); _networksDbWatcher = std::make_shared(_myAddressStr, cc->pubSubConfig->project_id, this); + _changeNotifier = std::make_shared(_myAddressStr, cc->pubSubConfig->project_id); } else { throw std::runtime_error( @@ -209,12 +211,8 @@ CentralDB::CentralDB( "CentralDB: BigTable status mode selected but no PubSub configuration provided"); } - pubsubWriter = std::make_shared( - cc->pubSubConfig->project_id, "ctl-member-status-update-stream", _myAddressStr); - _statusWriter = std::make_shared( - cc->bigTableConfig->project_id, cc->bigTableConfig->instance_id, cc->bigTableConfig->table_id, - pubsubWriter); + cc->bigTableConfig->project_id, cc->bigTableConfig->instance_id, cc->bigTableConfig->table_id); break; case STATUS_WRITER_MODE_PGSQL: default: @@ -223,6 +221,7 @@ CentralDB::CentralDB( break; } + // start background threads for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) { _commitThread[i] = std::thread(&CentralDB::commitThread, this); } @@ -935,7 +934,6 @@ void CentralDB::initializeMembers() config["ssoExempt"] = sso_exempt.value_or(false); config["authenticationExpiryTime"] = authentication_expiry_time.value_or(0); config["tags"] = json::parse(tags.value_or("[]")); - config["ipAssignments"] = json::array(); config["frontend"] = std::get<17>(row); Metrics::member_count++; @@ -1147,8 +1145,12 @@ void CentralDB::commitThread() target = config["remoteTraceTarget"]; } - pqxx::row nwrow = w.exec_params1("SELECT COUNT(id) FROM networks_ctl WHERE id = $1", networkId); + // get network and the frontend it is assigned to + // if network does not exist, skip member update + pqxx::row nwrow = w.exec_params1( + "SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend", networkId); int nwcount = nwrow[0].as(); + std::string frontend = nwrow[1].as(); if (nwcount != 1) { fprintf(stderr, "network %s does not exist. skipping member upsert\n", networkId.c_str()); @@ -1161,9 +1163,28 @@ void CentralDB::commitThread() "SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2", memberId, networkId); int membercount = mrow[0].as(); - bool isNewMember = (membercount == 0); + std::string change_source = config["change_source"]; + if (! isNewMember && change_source != "controller" && frontend != change_source) { + // if it is not a new member and the change source is not the controller and doesn't match the + // frontend, don't apply the change. + continue; + } + + if (_listenerMode == LISTENER_MODE_PUBSUB) { + // Publish change to pubsub stream + + if (config["change_source"].is_null() || config["change_source"] == "controller") { + nlohmann::json oldMember; + nlohmann::json newMember = config; + if (! isNewMember) { + oldMember = _getNetworkMember(w, networkId, memberId); + } + _changeNotifier->notifyMemberChange(oldMember, newMember, frontend); + } + } + pqxx::result res = w.exec_params0( "INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, " "ip_assignments, " @@ -1251,30 +1272,42 @@ void CentralDB::commitThread() std::string id = config["id"]; + pqxx::row nwrow = w.exec_params1( + "SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend", id); + int nwcount = nwrow[0].as(); + std::string frontend = nwrow[1].as(); + bool isNewNetwork = (nwcount == 0); + + std::string change_source = config["change_source"]; + if (! isNewNetwork && change_source != "controller" && frontend != change_source) { + // if it is not a new network and the change source is not the controller and doesn't match the + // frontend, don't apply the change. + continue; + } + + if (_listenerMode == LISTENER_MODE_PUBSUB) { + // Publish change to pubsub stream + if (config["change_source"].is_null() || config["change_source"] == "controller") { + nlohmann::json oldNetwork; + nlohmann::json newNetwork = config; + if (! isNewNetwork) { + oldNetwork = _getNetwork(w, id); + } + _changeNotifier->notifyNetworkChange(oldNetwork, newNetwork, frontend); + } + } + pqxx::result res = w.exec_params0( - "INSERT INTO networks_ctl (id, name, configuration, controller_id, revision) " - "VALUES ($1, $2, $3, $4, $5) " + "INSERT INTO networks_ctl (id, name, configuration, controller_id, revision, frontend) " + "VALUES ($1, $2, $3, $4, $5, $6) " "ON CONFLICT (id) DO UPDATE SET " - "name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1", + "name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1, " + "frontend = EXCLUDED.frontend", id, OSUtils::jsonString(config["name"], ""), OSUtils::jsonDump(config, -1), _myAddressStr, - ((uint64_t)config["revision"])); + ((uint64_t)config["revision"]), change_source); w.commit(); - // res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0); - - // auto pool = config["ipAssignmentPools"]; - // bool err = false; - // for (auto i = pool.begin(); i != pool.end(); ++i) { - // std::string start = (*i)["ipRangeStart"]; - // std::string end = (*i)["ipRangeEnd"]; - - // res = w.exec_params0( - // "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " - // "VALUES ($1, $2, $3)", - // id, start, end); - // } - const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); if (nwidInt) { nlohmann::json nwOrig; @@ -1516,4 +1549,179 @@ void CentralDB::onlineNotificationThread() } } +nlohmann::json CentralDB::_getNetworkMember(pqxx::work& tx, const std::string networkID, const std::string memberID) +{ + nlohmann::json out; + + try { + pqxx::row row = tx.exec_params1( + "SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, " + "nm.no_auto_assign_ips, " + "nm.sso_exempt, (EXTRACT(EPOCH FROM nm.authentication_expiry_time AT TIME ZONE 'UTC')*1000)::bigint, " + "(EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, " + "(EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, " + "(EXTRACT(EPOCH FROM nm.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, " + "nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags, " + "nm.frontend " + "FROM network_memberships_ctl nm " + "INNER JOIN networks_ctl n " + " ON nm.network_id = n.id " + "WHERE nm.network_id = $1 AND nm.device_id = $2", + networkID, memberID); + + bool authorized = row[2].as(); + std::optional active_bridge = + row[3].is_null() ? std::optional() : std::optional(row[3].as()); + std::string ip_assignments = row[4].is_null() ? "{}" : row[4].as(); + std::optional no_auto_assign_ips = + row[5].is_null() ? std::optional() : std::optional(row[5].as()); + std::optional sso_exempt = + row[6].is_null() ? std::optional() : std::optional(row[6].as()); + std::optional authentication_expiry_time = + row[7].is_null() ? std::optional() : std::optional(row[7].as()); + std::optional creation_time = + row[8].is_null() ? std::optional() : std::optional(row[8].as()); + std::optional identity = + row[9].is_null() ? std::optional() : std::optional(row[9].as()); + std::optional last_authorized_time = + row[10].is_null() ? std::optional() : std::optional(row[10].as()); + std::optional last_deauthorized_time = + row[11].is_null() ? std::optional() : std::optional(row[11].as()); + std::optional remote_trace_level = + row[12].is_null() ? std::optional() : std::optional(row[12].as()); + std::optional remote_trace_target = + row[13].is_null() ? std::optional() : std::optional(row[13].as()); + std::optional revision = + row[14].is_null() ? std::optional() : std::optional(row[14].as()); + std::optional capabilities = + row[15].is_null() ? std::optional() : std::optional(row[15].as()); + std::optional tags = + row[16].is_null() ? std::optional() : std::optional(row[16].as()); + std::string frontend = row[17].is_null() ? "" : row[17].as(); + + out["objtype"] = "member"; + out["id"] = memberID; + out["nwid"] = networkID; + out["address"] = identity.value_or(""); + out["authorized"] = authorized; + out["activeBridge"] = active_bridge.value_or(false); + out["ipAssignments"] = json::array(); + if (ip_assignments != "{}" && ip_assignments != "[]") { + std::string tmp = ip_assignments.substr(1, ip_assignments.length() - 2); + std::vector addrs = split(tmp, ','); + for (auto it = addrs.begin(); it != addrs.end(); ++it) { + out["ipAssignments"].push_back(*it); + } + } + out["capabilities"] = json::parse(capabilities.value_or("[]")); + out["creationTime"] = creation_time.value_or(0); + out["lastAuthorizedTime"] = last_authorized_time.value_or(0); + out["lastDeauthorizedTime"] = last_deauthorized_time.value_or(0); + out["noAutoAssignIps"] = no_auto_assign_ips.value_or(false); + out["remoteTraceLevel"] = remote_trace_level.value_or(0); + out["remoteTraceTarget"] = remote_trace_target.value_or(nullptr); + out["revision"] = revision.value_or(0); + out["ssoExempt"] = sso_exempt.value_or(false); + out["authenticationExpiryTime"] = authentication_expiry_time.value_or(0); + out["tags"] = json::parse(tags.value_or("[]")); + out["frontend"] = frontend; + } + catch (std::exception& e) { + fprintf( + stderr, "ERROR: Error getting network member %s-%s: %s\n", networkID.c_str(), memberID.c_str(), e.what()); + return nlohmann::json(); + } + + return out; +} + +nlohmann::json CentralDB::_getNetwork(pqxx::work& tx, const std::string networkID) +{ + nlohmann::json out; + + try { + std::optional name; + std::string cfg; + std::optional creation_time; + std::optional last_modified; + std::optional revision; + std::string frontend; + + pqxx::row row = tx.exec_params1( + "SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000)::bigint, " + "(EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision, frontend " + "FROM networks_ctl WHERE id = $1", + networkID); + + cfg = row[2].as(); + creation_time = row[3].is_null() ? std::optional() : std::optional(row[3].as()); + last_modified = row[4].is_null() ? std::optional() : std::optional(row[4].as()); + revision = row[5].is_null() ? std::optional() : std::optional(row[5].as()); + frontend = row[6].is_null() ? "" : row[6].as(); + + nlohmann::json cfgtmp = nlohmann::json::parse(cfg); + if (! cfgtmp.is_object()) { + fprintf(stderr, "ERROR: Network %s configuration is not a JSON object\n", networkID.c_str()); + return nlohmann::json(); + } + + out["objtype"] = "network"; + out["id"] = row[0].as(); + out["name"] = row[1].is_null() ? "" : row[1].as(); + out["creationTime"] = creation_time.value_or(0); + out["lastModified"] = last_modified.value_or(0); + out["revision"] = revision.value_or(0); + out["capabilities"] = cfgtmp["capabilities"].is_array() ? cfgtmp["capabilities"] : json::array(); + out["enableBroadcast"] = cfgtmp["enableBroadcast"].is_boolean() ? cfgtmp["enableBroadcast"].get() : false; + out["mtu"] = cfgtmp["mtu"].is_number() ? cfgtmp["mtu"].get() : 2800; + out["multicastLimit"] = cfgtmp["multicastLimit"].is_number() ? cfgtmp["multicastLimit"].get() : 64; + out["private"] = cfgtmp["private"].is_boolean() ? cfgtmp["private"].get() : true; + out["remoteTraceLevel"] = + cfgtmp["remoteTraceLevel"].is_number() ? cfgtmp["remoteTraceLevel"].get() : 0; + out["remoteTraceTarget"] = + cfgtmp["remoteTraceTarget"].is_string() ? cfgtmp["remoteTraceTarget"].get() : ""; + out["revision"] = revision.value_or(0); + out["rules"] = cfgtmp["rules"].is_array() ? cfgtmp["rules"] : json::array(); + out["tags"] = cfgtmp["tags"].is_array() ? cfgtmp["tags"] : json::array(); + if (cfgtmp["v4AssignMode"].is_object()) { + out["v4AssignMode"] = cfgtmp["v4AssignMode"]; + } + else { + out["v4AssignMode"] = json::object(); + out["v4AssignMode"]["zt"] = true; + } + if (cfgtmp["v6AssignMode"].is_object()) { + out["v6AssignMode"] = cfgtmp["v6AssignMode"]; + } + else { + out["v6AssignMode"] = json::object(); + out["v6AssignMode"]["zt"] = true; + out["v6AssignMode"]["6plane"] = true; + out["v6AssignMode"]["rfc4193"] = false; + } + out["ssoEnabled"] = cfgtmp["ssoEnabled"].is_boolean() ? cfgtmp["ssoEnabled"].get() : false; + out["objtype"] = "network"; + out["routes"] = cfgtmp["routes"].is_array() ? cfgtmp["routes"] : json::array(); + out["clientId"] = cfgtmp["clientId"].is_string() ? cfgtmp["clientId"].get() : ""; + out["authorizationEndpoint"] = + cfgtmp["authorizationEndpoint"].is_string() ? cfgtmp["authorizationEndpoint"].get() : nullptr; + out["provider"] = cfgtmp["ssoProvider"].is_string() ? cfgtmp["ssoProvider"].get() : ""; + if (! cfgtmp["dns"].is_object()) { + cfgtmp["dns"] = json::object(); + cfgtmp["dns"]["domain"] = ""; + cfgtmp["dns"]["servers"] = json::array(); + } + else { + out["dns"] = cfgtmp["dns"]; + } + out["ipAssignmentPools"] = cfgtmp["ipAssignmentPools"].is_array() ? cfgtmp["ipAssignmentPools"] : json::array(); + out["frontend"] = row[6].as(); + } + catch (std::exception& e) { + fprintf(stderr, "ERROR: Error getting network %s: %s\n", networkID.c_str(), e.what()); + return nlohmann::json(); + } + return out; +} + #endif // ZT_CONTROLLER_USE_LIBPQ diff --git a/nonfree/controller/CentralDB.hpp b/nonfree/controller/CentralDB.hpp index 0ffd1e562..932bd001f 100644 --- a/nonfree/controller/CentralDB.hpp +++ b/nonfree/controller/CentralDB.hpp @@ -23,6 +23,7 @@ struct SmeeClient; namespace ZeroTier { struct RedisConfig; struct ControllerConfig; +struct ControllerChangeNotifier; class CentralDB : public DB { public: @@ -94,6 +95,11 @@ class CentralDB : public DB { void configureSmee(); void notifyNewMember(const std::string& networkID, const std::string& memberID); + nlohmann::json _getNetworkMember(pqxx::work& tx, const std::string networkID, const std::string memberID); + + nlohmann::json _getNetwork(pqxx::work& tx, const std::string networkID); + + private: enum OverrideMode { ALLOW_PGBOUNCER_OVERRIDE = 0, NO_OVERRIDE = 1 }; ListenerMode _listenerMode; @@ -112,6 +118,7 @@ class CentralDB : public DB { std::shared_ptr _membersDbWatcher; std::shared_ptr _networksDbWatcher; std::shared_ptr _statusWriter; + std::shared_ptr _changeNotifier; std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS]; std::thread _onlineNotificationThread; diff --git a/nonfree/controller/ControllerChangeNotifier.cpp b/nonfree/controller/ControllerChangeNotifier.cpp new file mode 100644 index 000000000..74d52754f --- /dev/null +++ b/nonfree/controller/ControllerChangeNotifier.cpp @@ -0,0 +1,52 @@ +#include "ControllerChangeNotifier.hpp" + +#include "PubSubWriter.hpp" + +namespace ZeroTier { + +PubSubChangeNotifier::PubSubChangeNotifier(std::string controllerID, std::string project) + : ControllerChangeNotifier() + , _cv1networkChangeWriter(std::make_shared(project, "ctl-to-cv1-network-change-stream", controllerID)) + , _cv1memberChangeWriter(std::make_shared(project, "ctl-to-cv1-member-change-stream", controllerID)) + , _cv2networkChangeWriter(std::make_shared(project, "ctl-to-cv2-network-change-stream", controllerID)) + , _cv2memberChangeWriter(std::make_shared(project, "ctl-to-cv2-member-change-stream", controllerID)) +{ +} + +PubSubChangeNotifier::~PubSubChangeNotifier() +{ +} + +void PubSubChangeNotifier::notifyNetworkChange( + const nlohmann::json& oldNetwork, + const nlohmann::json& newNetwork, + const std::string& frontend) +{ + if (frontend == "cv1") { + _cv1networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork); + } + else if (frontend == "cv2") { + _cv2networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork); + } + else { + throw std::runtime_error("Unknown frontend: " + frontend); + } +} + +void PubSubChangeNotifier::notifyMemberChange( + const nlohmann::json& oldMember, + const nlohmann::json newMember, + const std::string& frontend) +{ + if (frontend == "cv1") { + _cv1memberChangeWriter->publishMemberChange(oldMember, newMember); + } + else if (frontend == "cv2") { + _cv2memberChangeWriter->publishMemberChange(oldMember, newMember); + } + else { + throw std::runtime_error("Unknown frontend: " + frontend); + } +} + +} // namespace ZeroTier \ No newline at end of file diff --git a/nonfree/controller/ControllerChangeNotifier.hpp b/nonfree/controller/ControllerChangeNotifier.hpp new file mode 100644 index 000000000..ba8b210e3 --- /dev/null +++ b/nonfree/controller/ControllerChangeNotifier.hpp @@ -0,0 +1,52 @@ +#ifndef CONTROLLERCHANGENOTIFIER_HPP +#define CONTROLLERCHANGENOTIFIER_HPP + +#include +#include +#include + +namespace ZeroTier { + +class PubSubWriter; + +class ControllerChangeNotifier { + public: + virtual ~ControllerChangeNotifier() = default; + + virtual void notifyNetworkChange( + const nlohmann::json& oldNetwork, + const nlohmann::json& newNetwork, + const std::string& frontend = "") = 0; + + virtual void notifyMemberChange( + const nlohmann::json& oldMember, + const nlohmann::json newMember, + const std::string& frontend = "") = 0; +}; + +class PubSubChangeNotifier : public ControllerChangeNotifier { + public: + PubSubChangeNotifier(std::string controllerID, std::string project); + virtual ~PubSubChangeNotifier(); + + virtual void notifyNetworkChange( + const nlohmann::json& oldNetwork, + const nlohmann::json& newNetwork, + const std::string& frontend = "") override; + + virtual void notifyMemberChange( + const nlohmann::json& oldMember, + const nlohmann::json newMember, + const std::string& frontend = "") override; + + private: + std::shared_ptr _cv1networkChangeWriter; + std::shared_ptr _cv1memberChangeWriter; + + std::shared_ptr _cv2networkChangeWriter; + std::shared_ptr _cv2memberChangeWriter; +}; + +} // namespace ZeroTier + +#endif // CONTROLLERCHANGENOTIFIER_HPP \ No newline at end of file diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index 31f3eeff1..c4e7fb58d 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -420,7 +420,7 @@ nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::Mem } out["noAutoAssignIps"] = mc.no_auto_assign_ips(); - out["ssoExempt"] = mc.sso_exepmt(); + out["ssoExempt"] = mc.sso_exempt(); out["authenticationExpiryTime"] = mc.auth_expiry_time(); out["capabilities"] = OSUtils::jsonParse(mc.capabilities()); out["creationTime"] = mc.creation_time(); diff --git a/nonfree/controller/PubSubWriter.cpp b/nonfree/controller/PubSubWriter.cpp index 2fe3f52d5..c1bd7d568 100644 --- a/nonfree/controller/PubSubWriter.cpp +++ b/nonfree/controller/PubSubWriter.cpp @@ -1,5 +1,6 @@ #include "PubSubWriter.hpp" +#include "../../osdep/OSUtils.hpp" #include "CtlUtil.hpp" #include "member.pb.h" #include "member_status.pb.h" @@ -15,6 +16,12 @@ namespace pubsub = ::google::cloud::pubsub; namespace ZeroTier { + +pbmessages::NetworkChange +networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork); +pbmessages::MemberChange +memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember); + PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string controller_id) : _controller_id(controller_id) , _project(project) @@ -44,12 +51,9 @@ PubSubWriter::~PubSubWriter() { } -bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend) +bool PubSubWriter::publishMessage(const std::string& payload) { std::vector > attributes; - if (! frontend.empty()) { - attributes.emplace_back("frontend", frontend); - } attributes.emplace_back("controller_id", _controller_id); auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build(); @@ -63,30 +67,28 @@ bool PubSubWriter::publishMessage(const std::string& payload, const std::string& return true; } -bool PubSubWriter::publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend) +bool PubSubWriter::publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork) { - pbmessages::NetworkChange nc; - // nc.mutable_new_()->CopyFrom(fromJson(networkJson)); + pbmessages::NetworkChange nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork); std::string payload; if (! nc.SerializeToString(&payload)) { fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n"); return false; } - return publishMessage(payload, frontend); + return publishMessage(payload); } -bool PubSubWriter::publishMemberChange(const nlohmann::json& memberJson, const std::string& frontend) +bool PubSubWriter::publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember) { - pbmessages::MemberChange mc; - // mc.mutable_new_()->CopyFrom(fromJson(memberJson)); + pbmessages::MemberChange mc = memberChangeFromJson(_controller_id, oldMember, newMember); std::string payload; if (! mc.SerializeToString(&payload)) { fprintf(stderr, "Failed to serialize MemberChange protobuf message\n"); return false; } - return publishMessage(payload, frontend); + return publishMessage(payload); } bool PubSubWriter::publishStatusChange( @@ -122,7 +124,158 @@ bool PubSubWriter::publishStatusChange( return false; } - return publishMessage(payload, frontend); + return publishMessage(payload); +} + +pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j) +{ + if (! j.is_object()) { + return nullptr; + } + + pbmessages::NetworkChange_Network* n = new pbmessages::NetworkChange_Network(); + n->set_network_id(j.value("id", "")); + n->set_name(j.value("name", "")); + n->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1)); + n->set_creation_time(j.value("creationTime", 0)); + n->set_enable_broadcast(j.value("enableBroadcast", false)); + + for (const auto& p : j["ipAssignmentPools"]) { + if (p.is_object()) { + auto pool = n->add_assignment_pools(); + pool->set_start_ip(p.value("ipRangeStart", "")); + pool->set_end_ip(p.value("ipRangeEnd", "")); + } + } + + n->set_mtu(j.value("mtu", 2800)); + n->set_multicast_limit(j.value("multicastLimit", 32)); + n->set_is_private(j.value("private", true)); + n->set_remote_trace_level(j.value("remoteTraceLevel", 0)); + n->set_remote_trace_target(j.value("remoteTraceTarget", "")); + n->set_revision(j.value("revision", 0)); + + for (const auto& p : j["routes"]) { + if (p.is_object()) { + auto r = n->add_routes(); + r->set_target(p.value("target", "")); + r->set_via(p.value("via", "")); + } + } + + n->set_rules(""); + n->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1)); + + pbmessages::NetworkChange_IPV4AssignMode* v4am = new pbmessages::NetworkChange_IPV4AssignMode(); + if (j["v4AssignMode"].is_object()) { + v4am->set_zt(j["v4AssignMode"].value("zt", false)); + } + n->set_allocated_ipv4_assign_mode(v4am); + + pbmessages::NetworkChange_IPV6AssignMode* v6am = new pbmessages::NetworkChange_IPV6AssignMode(); + if (j["v6AssignMode"].is_object()) { + v6am->set_zt(j["v6AssignMode"].value("zt", false)); + v6am->set_six_plane(j["v6AssignMode"].value("6plane", false)); + v6am->set_rfc4193(j["v6AssignMode"].value("rfc4193", false)); + } + n->set_allocated_ipv6_assign_mode(v6am); + + nlohmann::json jdns = j.value("dns", nullptr); + if (jdns.is_object()) { + pbmessages::NetworkChange_DNS* dns = new pbmessages::NetworkChange_DNS(); + dns->set_domain(jdns.value("domain", "")); + for (const auto& s : jdns["servers"]) { + if (s.is_string()) { + auto server = dns->add_nameservers(); + *server = s; + } + } + n->set_allocated_dns(dns); + } + + n->set_sso_enabled(j.value("ssoEnabled", false)); + if (j.value("ssoEnabled", false)) { + n->set_sso_provider(j.value("provider", "")); + n->set_sso_client_id(j.value("clientId", "")); + n->set_sso_authorization_endpoint(j.value("authorizationEndpoint", "")); + n->set_sso_issuer(j.value("issuer", "")); + n->set_sso_provider(j.value("provider", "")); + } + + n->set_rules_source(""); + + return n; +} + +pbmessages::NetworkChange +networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork) +{ + pbmessages::NetworkChange nc; + nc.set_allocated_old(networkFromJson(oldNetwork)); + nc.set_allocated_new_(networkFromJson(newNetwork)); + nc.set_change_source(pbmessages::NetworkChange_ChangeSource::NetworkChange_ChangeSource_CONTROLLER); + + pbmessages::NetworkChange_NetworkChangeMetadata* metadata = new pbmessages::NetworkChange_NetworkChangeMetadata(); + metadata->set_controller_id(controllerID); + metadata->set_trace_id(""); // TODO: generate a trace ID + nc.set_allocated_metadata(metadata); + + return nc; +} + +pbmessages::MemberChange_Member* memberFromJson(const nlohmann::json& j) +{ + if (! j.is_object()) { + return nullptr; + } + + pbmessages::MemberChange_Member* m = new pbmessages::MemberChange_Member(); + m->set_network_id(j.value("networkId", "")); + m->set_device_id(j.value("id", "")); + m->set_identity(j.value("identity", "")); + m->set_authorized(j.value("authorized", false)); + for (const auto& addr : j.value("ipAssignments", nlohmann::json::array())) { + if (addr.is_string()) { + auto a = m->add_ip_assignments(); + *a = addr; + } + } + m->set_active_bridge(j.value("activeBridge", false)); + m->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1)); + m->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1)); + m->set_creation_time(j.value("creationTime", 0)); + m->set_no_auto_assign_ips(j.value("noAutoAssignIps", false)); + m->set_revision(j.value("revision", 0)); + m->set_last_authorized_time(j.value("lastAuthorizedTime", 0)); + m->set_last_deauthorized_time(j.value("lastDeauthorizedTime", 0)); + m->set_last_authorized_credential_type(j.value("lastAuthorizedCredentialType", nullptr)); + m->set_last_authorized_credential(j.value("lastAuthorizedCredential", nullptr)); + m->set_version_major(j.value("versionMajor", 0)); + m->set_version_minor(j.value("versionMinor", 0)); + m->set_version_rev(j.value("versionRev", 0)); + m->set_version_protocol(j.value("versionProtocol", 0)); + m->set_remote_trace_level(j.value("remoteTraceLevel", 0)); + m->set_remote_trace_target(j.value("remoteTraceTarget", "")); + m->set_sso_exempt(j.value("ssoExempt", false)); + m->set_auth_expiry_time(j.value("authExpiryTime", 0)); + + return m; +} + +pbmessages::MemberChange +memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember) +{ + pbmessages::MemberChange mc; + mc.set_allocated_old(memberFromJson(oldMember)); + mc.set_allocated_new_(memberFromJson(newMember)); + mc.set_change_source(pbmessages::MemberChange_ChangeSource::MemberChange_ChangeSource_CONTROLLER); + + pbmessages::MemberChange_MemberChangeMetadata* metadata = new pbmessages::MemberChange_MemberChangeMetadata(); + metadata->set_controller_id(controllerID); + metadata->set_trace_id(""); // TODO: generate a trace ID + mc.set_allocated_metadata(metadata); + + return mc; } } // namespace ZeroTier \ No newline at end of file diff --git a/nonfree/controller/PubSubWriter.hpp b/nonfree/controller/PubSubWriter.hpp index 170403d39..26a7bc9ee 100644 --- a/nonfree/controller/PubSubWriter.hpp +++ b/nonfree/controller/PubSubWriter.hpp @@ -13,8 +13,10 @@ class PubSubWriter { PubSubWriter(std::string project, std::string topic, std::string controller_id); virtual ~PubSubWriter(); - bool publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend = ""); - bool publishMemberChange(const nlohmann::json& memberJson, const std::string& frontend = ""); + bool publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork); + + bool publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember); + bool publishStatusChange( std::string frontend, std::string network_id, @@ -25,7 +27,7 @@ class PubSubWriter { int64_t last_seen); protected: - bool publishMessage(const std::string& payload, const std::string& frontend = ""); + bool publishMessage(const std::string& payload); private: std::string _controller_id; diff --git a/nonfree/controller/protobuf/member.proto b/nonfree/controller/protobuf/member.proto index a10e2839b..04a1add55 100644 --- a/nonfree/controller/protobuf/member.proto +++ b/nonfree/controller/protobuf/member.proto @@ -25,7 +25,7 @@ message MemberChange { int32 version_protocol = 19; // Protocol version of the member int32 remote_trace_level = 20; // Remote trace level optional string remote_trace_target = 21; // Remote trace target - bool sso_exepmt = 22; // Whether SSO is exempt + bool sso_exempt = 22; // Whether SSO is exempt uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds } message MemberChangeMetadata { diff --git a/nonfree/controller/protobuf/network.proto b/nonfree/controller/protobuf/network.proto index 1fe359dfe..41bbb73e8 100644 --- a/nonfree/controller/protobuf/network.proto +++ b/nonfree/controller/protobuf/network.proto @@ -27,7 +27,7 @@ message NetworkChange { bool zt = 1; // Whether ZeroTier is used for IPv4 assignment } - message IPv6AssignMode { + message IPV6AssignMode { bool six_plane = 1; // Whether 6plane is used for IPv6 assignment bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment bool zt = 3; // Whether ZeroTier is used for IPv6 assignment @@ -50,7 +50,7 @@ message NetworkChange { string rules = 14; // JSON string of rules optional string tags = 15; // JSON string of tags IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode - IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode + IPV6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode optional DNS dns = 18; // DNS configuration bool sso_enabled = 19; // Whether Single Sign-On is enabled optional string sso_client_id = 20; // SSO client ID