diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt index ea11b3952..670858caa 100644 --- a/controller/CMakeLists.txt +++ b/controller/CMakeLists.txt @@ -34,6 +34,7 @@ set(LINK_LIBS if (ZT1_CENTRAL_CONTROLLER) find_package(PostgreSQL REQUIRED) + find_package(protobuf REQUIRED) list(APPEND SRC_FILES CV1.cpp @@ -64,7 +65,7 @@ if (ZT1_CENTRAL_CONTROLLER) ${PostgreSQL_INCLUDE_DIRS} "${redis++_BUILD_DIR}/src" ${pqxx_INCLUDE_DIRS} - + "${CMAKE_CURRENT_BINARY_DIR}" ) list(APPEND LINK_LIBS @@ -78,6 +79,16 @@ endif() add_library(zerotier-controller STATIC ${SRC_FILES}) +if (ZT1_CENTRAL_CONTROLLER) + file(GLOB PROTO_FILES "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/*.proto") + protobuf_generate( + TARGET zerotier-controller + LANGUAGE cpp + PROTOS ${PROTO_FILES} + APPEND_PATH + ) +endif() + target_include_directories(zerotier-controller PRIVATE ${INCLUDE_DIRS}) add_dependencies(zerotier-controller redis++::redis++) diff --git a/controller/CV1.cpp b/controller/CV1.cpp index e57136555..e739561bb 100644 --- a/controller/CV1.cpp +++ b/controller/CV1.cpp @@ -73,7 +73,8 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc _myAddressStr = myId.address().toString(myAddress); _connString = std::string(path); auto f = std::make_shared(_connString); - _pool = std::make_shared >(15, 5, std::static_pointer_cast(f)); + _pool = + std::make_shared >(15, 5, std::static_pointer_cast(f)); memset(_ssoPsk, 0, sizeof(_ssoPsk)); char* const ssoPskHex = getenv("ZT_SSO_PSK"); @@ -100,7 +101,11 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc txn.commit(); if (dbVersion < DB_MINIMUM_VERSION) { - fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION); + fprintf( + stderr, + "Central database schema version too low. This controller version requires a minimum schema version of " + "%d. Please upgrade your Central instance", + DB_MINIMUM_VERSION); exit(1); } _pool->unborrow(c); @@ -135,7 +140,9 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc _readyLock.lock(); - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); + fprintf( + stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, + ::_timestr(), (unsigned long long)_myAddress.toInt()); _waitNoticePrinted = true; initializeNetworks(); @@ -198,7 +205,8 @@ void CV1::configureSmee() if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) { fprintf(stderr, "creating smee client\n"); - std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port); + std::string hostPort = + std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port); this->_smee = rustybits::smee_client_new(hostPort.c_str(), ns, task_queue); } else { @@ -328,7 +336,11 @@ void CV1::eraseMember(const uint64_t networkId, const uint64_t memberId) _memberChanged(tmp.first, nullJson, true); } -void CV1::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch) +void CV1::nodeIsOnline( + const uint64_t networkId, + const uint64_t memberId, + const InetAddress& physicalAddress, + const char* osArch) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("cv1"); @@ -393,15 +405,17 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re std::string nonce = ""; // check if the member exists first. - pqxx::row count = w.exec_params1("SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, networkId); + pqxx::row count = w.exec_params1( + "SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, + networkId); if (count[0].as() == 1) { // get active nonce, if exists. pqxx::result r = w.exec_params( "SELECT nonce FROM ztc_sso_expiry " "WHERE network_id = $1 AND member_id = $2 " - "AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)", - networkId, - memberId); + "AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= " + "nonce_expiration)", + networkId, memberId); if (r.size() == 0) { // no active nonce. @@ -410,8 +424,7 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re "SELECT nonce FROM ztc_sso_expiry " "WHERE network_id = $1 AND member_id = $2 " "AND authentication_expiry_time IS NULL AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)", - networkId, - memberId); + networkId, memberId); if (r.size() == 1) { // we have an existing nonce. Use it @@ -429,10 +442,7 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re "INSERT INTO ztc_sso_expiry " "(nonce, nonce_expiration, network_id, member_id) VALUES " "($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)", - nonce, - OSUtils::now() + 300000, - networkId, - memberId); + nonce, OSUtils::now() + 300000, networkId, memberId); w.commit(); } @@ -478,7 +488,9 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re sso_version = r.at(0)[4].as >().value_or(1); } else if (r.size() > 1) { - fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", networkId.c_str()); + fprintf( + stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", + networkId.c_str()); } else { fprintf(stderr, "No client or auth endpoint?!?\n"); @@ -496,13 +508,10 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re if (info.version == 0) { char url[2048] = { 0 }; OSUtils::ztsnprintf( - url, - sizeof(authenticationURL), - "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&nonce=%s&state=%s&client_id=%s", - authorization_endpoint.c_str(), - url_encode(redirectURL).c_str(), - nonce.c_str(), - state_hex, + url, sizeof(authenticationURL), + "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&" + "nonce=%s&state=%s&client_id=%s", + authorization_endpoint.c_str(), url_encode(redirectURL).c_str(), nonce.c_str(), state_hex, client_id.c_str()); info.authenticationURL = std::string(url); } @@ -516,18 +525,17 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re #ifdef ZT_DEBUG fprintf( stderr, - "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: %s\n", - info.ssoClientID.c_str(), - info.issuerURL.c_str(), - info.ssoNonce.c_str(), - info.ssoState.c_str(), - info.centralAuthURL.c_str(), - provider.c_str()); + "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: " + "%s\n", + info.ssoClientID.c_str(), info.issuerURL.c_str(), info.ssoNonce.c_str(), info.ssoState.c_str(), + info.centralAuthURL.c_str(), provider.c_str()); #endif } } else { - fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str()); + fprintf( + stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), + authorization_endpoint.c_str()); } } @@ -573,13 +581,18 @@ void CV1::initializeNetworks() char qbuf[2048] = { 0 }; sprintf( qbuf, - "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, " - "n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, " - "n.remote_trace_target, n.revision, n.rules, n.tags, n.v4_assign_mode, n.v6_assign_mode, n.sso_enabled, (CASE WHEN n.sso_enabled THEN noc.client_id ELSE NULL END) as client_id, " + "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, " + "n.capabilities, " + "n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS " + "last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, " + "n.remote_trace_target, n.revision, n.rules, n.tags, n.v4_assign_mode, n.v6_assign_mode, n.sso_enabled, " + "(CASE WHEN n.sso_enabled THEN noc.client_id ELSE NULL END) as client_id, " "(CASE WHEN n.sso_enabled THEN oc.authorization_endpoint ELSE NULL END) as authorization_endpoint, " "(CASE WHEN n.sso_enabled THEN oc.provider ELSE NULL END) as provider, d.domain, d.servers, " - "ARRAY(SELECT CONCAT(host(ip_range_start),'|', host(ip_range_end)) FROM ztc_network_assignment_pool WHERE network_id = n.id) AS assignment_pool, " - "ARRAY(SELECT CONCAT(host(address),'/',bits::text,'|',COALESCE(host(via), 'NULL'))FROM ztc_network_route WHERE network_id = n.id) AS routes " + "ARRAY(SELECT CONCAT(host(ip_range_start),'|', host(ip_range_end)) FROM ztc_network_assignment_pool WHERE " + "network_id = n.id) AS assignment_pool, " + "ARRAY(SELECT CONCAT(host(address),'/',bits::text,'|',COALESCE(host(via), 'NULL'))FROM ztc_network_route " + "WHERE network_id = n.id) AS routes " "FROM ztc_network n " "LEFT OUTER JOIN ztc_org o " " ON o.owner_id = n.owner_id " @@ -813,7 +826,9 @@ void CV1::initializeNetworks() if (++this->_ready == 2) { if (_waitNoticePrinted) { - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt()); + fprintf( + stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), + (unsigned long long)_myAddress.toInt()); } _readyLock.unlock(); } @@ -899,12 +914,14 @@ void CV1::initializeMembers() " FROM ztc_sso_expiry e " " INNER JOIN ztc_network n1 " " ON n1.id = e.network_id AND n1.deleted = TRUE " - " WHERE e.network_id = m.network_id AND e.member_id = m.id AND n.sso_enabled = TRUE AND e.authentication_expiry_time IS NOT NULL " + " WHERE e.network_id = m.network_id AND e.member_id = m.id AND n.sso_enabled = TRUE AND " + "e.authentication_expiry_time IS NOT NULL " " ORDER BY e.authentication_expiry_time DESC LIMIT 1 " " ) " " ELSE NULL " " END) AS authentication_expiry_time, " - "ARRAY(SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = m.id AND network_id = m.network_id) AS assigned_addresses " + "ARRAY(SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = m.id AND network_id = " + "m.network_id) AS assigned_addresses " "FROM ztc_member m " "INNER JOIN ztc_network n " " ON n.id = m.network_id " @@ -1089,7 +1106,9 @@ void CV1::initializeMembers() if (++this->_ready == 2) { if (_waitNoticePrinted) { - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt()); + fprintf( + stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), + (unsigned long long)_myAddress.toInt()); } _readyLock.unlock(); } @@ -1150,13 +1169,17 @@ void CV1::heartbeat() pqxx::work w { *c->c }; pqxx::result res = w.exec0( - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis, redis_member_status) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, " + "v_rev, v_build, host_port, use_redis, redis_member_status) " "VALUES (" - + w.quote(controllerId) + ", " + w.quote(hostname) + ", TO_TIMESTAMP(" + now + "::double precision/1000), " + w.quote(publicIdentity) + ", " + major + ", " + minor + ", " + rev + ", " + build + ", " + host_port + ", " - + use_redis + ", " + redis_mem_status + + w.quote(controllerId) + ", " + w.quote(hostname) + ", TO_TIMESTAMP(" + now + + "::double precision/1000), " + w.quote(publicIdentity) + ", " + major + ", " + minor + ", " + rev + + ", " + build + ", " + host_port + ", " + use_redis + ", " + redis_mem_status + ") " - "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " - "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " + "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = " + "EXCLUDED.last_alive, " + "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = " + "EXCLUDED.v_minor, " "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " "use_redis = EXCLUDED.use_redis, redis_member_status = EXCLUDED.redis_member_status"); w.commit(); @@ -1200,7 +1223,8 @@ void CV1::membersDbWatcher() } if (_run == 1) { - fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + fprintf( + stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); exit(9); } fprintf(stderr, "Exited membersDbWatcher\n"); @@ -1306,7 +1330,9 @@ void CV1::networksDbWatcher() } if (_run == 1) { - fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + fprintf( + stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", + _myAddressStr.c_str()); exit(8); } fprintf(stderr, "Exited networksDbWatcher\n"); @@ -1383,7 +1409,9 @@ void CV1::_networksWatcher_Redis() } } catch (std::exception& e) { - fprintf(stderr, "json parse error in networkWatcher_Redis: what: %s json: %s\n", e.what(), a.second.c_str()); + fprintf( + stderr, "json parse error in networkWatcher_Redis: what: %s json: %s\n", e.what(), + a.second.c_str()); } } if (_rc->clusterMode) { @@ -1468,7 +1496,8 @@ void CV1::commitThread() continue; } - pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId); + pqxx::row mrow = w.exec_params1( + "SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId); int membercount = mrow[0].as(); bool isNewMember = false; @@ -1478,27 +1507,17 @@ void CV1::commitThread() pqxx::result res = w.exec_params0( "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " - "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " + "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, " + "v_proto) " "VALUES ($1, $2, $3, $4, $5, $6, " "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " "$9, $10, $11, $12, $13, $14, $15, $16, $17)", - memberId, - networkId, - (bool)config["activeBridge"], - (bool)config["authorized"], - OSUtils::jsonDump(config["capabilities"], -1), - OSUtils::jsonString(config["identity"], ""), - (uint64_t)config["lastAuthorizedTime"], - (uint64_t)config["lastDeauthorizedTime"], - (bool)config["noAutoAssignIps"], - (int)config["remoteTraceLevel"], - target, - (uint64_t)config["revision"], - OSUtils::jsonDump(config["tags"], -1), - (int)config["vMajor"], - (int)config["vMinor"], - (int)config["vRev"], - (int)config["vProto"]); + memberId, networkId, (bool)config["activeBridge"], (bool)config["authorized"], + OSUtils::jsonDump(config["capabilities"], -1), OSUtils::jsonString(config["identity"], ""), + (uint64_t)config["lastAuthorizedTime"], (uint64_t)config["lastDeauthorizedTime"], + (bool)config["noAutoAssignIps"], (int)config["remoteTraceLevel"], target, + (uint64_t)config["revision"], OSUtils::jsonDump(config["tags"], -1), (int)config["vMajor"], + (int)config["vMinor"], (int)config["vRev"], (int)config["vProto"]); } else { // existing member @@ -1510,27 +1529,18 @@ void CV1::commitThread() "no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, " "revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 " "WHERE id = $1 AND network_id = $2", - memberId, - networkId, - (bool)config["activeBridge"], - (bool)config["authorized"], - OSUtils::jsonDump(config["capabilities"], -1), - OSUtils::jsonString(config["identity"], ""), - (uint64_t)config["lastAuthorizedTime"], - (uint64_t)config["lastDeauthorizedTime"], - (bool)config["noAutoAssignIps"], - (int)config["remoteTraceLevel"], - target, - (uint64_t)config["revision"], - OSUtils::jsonDump(config["tags"], -1), - (int)config["vMajor"], - (int)config["vMinor"], - (int)config["vRev"], - (int)config["vProto"]); + memberId, networkId, (bool)config["activeBridge"], (bool)config["authorized"], + OSUtils::jsonDump(config["capabilities"], -1), OSUtils::jsonString(config["identity"], ""), + (uint64_t)config["lastAuthorizedTime"], (uint64_t)config["lastDeauthorizedTime"], + (bool)config["noAutoAssignIps"], (int)config["remoteTraceLevel"], target, + (uint64_t)config["revision"], OSUtils::jsonDump(config["tags"], -1), (int)config["vMajor"], + (int)config["vMinor"], (int)config["vRev"], (int)config["vProto"]); } if (! isNewMember) { - pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", memberId, networkId); + pqxx::result res = w.exec_params0( + "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", memberId, + networkId); } std::vector assignments; @@ -1542,7 +1552,10 @@ void CV1::commitThread() continue; } - pqxx::result res = w.exec_params0("INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING", memberId, networkId, addr); + pqxx::result res = w.exec_params0( + "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) " + "ON CONFLICT (network_id, member_id, address) DO NOTHING", + memberId, networkId, addr); assignments.push_back(addr); } @@ -1586,11 +1599,15 @@ void CV1::commitThread() _memberChanged(memOrig, memNew, qitem.second); } else { - fprintf(stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt); + fprintf( + stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", + _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt); } } catch (std::exception& e) { - fprintf(stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), memberId.c_str(), e.what()); + fprintf( + stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), + memberId.c_str(), e.what()); mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); } } @@ -1619,12 +1636,14 @@ void CV1::commitThread() // did not previously exist. If the record already exists owner_id is left // unchanged, so owner_id should be left out of the update clause. pqxx::result res = w.exec_params0( - "INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, " + "INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, " + "enable_broadcast, " "last_modified, mtu, multicast_limit, name, private, " "remote_trace_level, remote_trace_target, rules, rules_source, " "tags, v4_assign_mode, v6_assign_mode, sso_enabled) VALUES (" "$1, TO_TIMESTAMP($5::double precision/1000), " - "(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1)," + "(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true " + "AND modify = true AND read = true LIMIT 1)," "$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), " "$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) " "ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, " @@ -1636,25 +1655,15 @@ void CV1::commitThread() "rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, " "v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode, " "sso_enabled = EXCLUDED.sso_enabled", - id, - _myAddressStr, - OSUtils::jsonDump(config["capabilities"], -1), - (bool)config["enableBroadcast"], - OSUtils::now(), - (int)config["mtu"], - (int)config["multicastLimit"], - OSUtils::jsonString(config["name"], ""), - (bool)config["private"], - (int)config["remoteTraceLevel"], - remoteTraceTarget, - OSUtils::jsonDump(config["rules"], -1), - rulesSource, - OSUtils::jsonDump(config["tags"], -1), - OSUtils::jsonDump(config["v4AssignMode"], -1), - OSUtils::jsonDump(config["v6AssignMode"], -1), + id, _myAddressStr, OSUtils::jsonDump(config["capabilities"], -1), + (bool)config["enableBroadcast"], OSUtils::now(), (int)config["mtu"], + (int)config["multicastLimit"], OSUtils::jsonString(config["name"], ""), (bool)config["private"], + (int)config["remoteTraceLevel"], remoteTraceTarget, OSUtils::jsonDump(config["rules"], -1), + rulesSource, OSUtils::jsonDump(config["tags"], -1), + OSUtils::jsonDump(config["v4AssignMode"], -1), OSUtils::jsonDump(config["v6AssignMode"], -1), OSUtils::jsonBool(config["ssoEnabled"], false)); - res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0); + res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", id); auto pool = config["ipAssignmentPools"]; bool err = false; @@ -1665,9 +1674,7 @@ void CV1::commitThread() res = w.exec_params0( "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " "VALUES ($1, $2, $3)", - id, - start, - end); + id, start, end); } res = w.exec_params0("DELETE FROM ztc_network_route WHERE network_id = $1", id); @@ -1692,7 +1699,9 @@ void CV1::commitThread() via = (*i)["via"]; } - res = w.exec_params0("INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str())); + res = w.exec_params0( + "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", + id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str())); } if (err) { fprintf(stderr, "%s: route add error\n", _myAddressStr.c_str()); @@ -1715,7 +1724,10 @@ void CV1::commitThread() std::string s = servers.str(); - res = w.exec_params0("INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers", id, domain, s); + res = w.exec_params0( + "INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT " + "(network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers", + id, domain, s); w.commit(); @@ -1729,7 +1741,9 @@ void CV1::commitThread() _networkChanged(nwOrig, nwNew, qitem.second); } else { - fprintf(stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt); + fprintf( + stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), + (unsigned long long)nwidInt); } } catch (std::exception& e) { @@ -1803,7 +1817,9 @@ void CV1::commitThread() std::string memberId = config["id"]; std::string networkId = config["nwid"]; - pqxx::result res = w.exec_params0("UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", memberId, networkId); + pqxx::result res = w.exec_params0( + "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", + memberId, networkId); w.commit(); } @@ -1927,7 +1943,8 @@ void CV1::onlineNotification_Postgres() std::string memberId(memTmp); try { - pqxx::row r = w2.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId); + pqxx::row r = w2.exec_params1( + "SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId); } catch (pqxx::unexpected_rows& e) { continue; @@ -1946,8 +1963,9 @@ void CV1::onlineNotification_Postgres() } std::stringstream memberUpdate; - memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated, os, arch) VALUES " - << "('" << networkId << "', '" << memberId << "', "; + memberUpdate + << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated, os, arch) VALUES " + << "('" << networkId << "', '" << memberId << "', "; if (ipAddr.empty()) { memberUpdate << "NULL, "; } @@ -1958,7 +1976,8 @@ void CV1::onlineNotification_Postgres() << "'" << os << "', " << "'" << arch << "'" << ") " - << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated, " + << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, " + "last_updated = EXCLUDED.last_updated, " << "os = EXCLUDED.os, arch = EXCLUDED.arch"; pipe.insert(memberUpdate.str()); @@ -1980,7 +1999,9 @@ void CV1::onlineNotification_Postgres() _pool->unborrow(c); ConnectionPoolStats stats = _pool->get_stats(); - fprintf(stderr, "%s pool stats: in use size: %llu, available size: %llu, total: %llu\n", _myAddressStr.c_str(), stats.borrowed_size, stats.pool_size, (stats.borrowed_size + stats.pool_size)); + fprintf( + stderr, "%s pool stats: in use size: %llu, available size: %llu, total: %llu\n", _myAddressStr.c_str(), + stats.borrowed_size, stats.pool_size, (stats.borrowed_size + stats.pool_size)); span->End(); @@ -1988,7 +2009,9 @@ void CV1::onlineNotification_Postgres() } fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); if (_run == 1) { - fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + fprintf( + stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", + _myAddressStr.c_str()); exit(6); } } @@ -2042,7 +2065,10 @@ void CV1::onlineNotification_Redis() } } -uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controllerId, std::unordered_map, NodeOnlineRecord, _PairHasher>& lastOnline) +uint64_t CV1::_doRedisUpdate( + sw::redis::Transaction& tx, + std::string& controllerId, + std::unordered_map, NodeOnlineRecord, _PairHasher>& lastOnline) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("cv1"); @@ -2079,7 +2105,11 @@ uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controller arch = osArchSplit[1]; } - std::unordered_map record = { { "id", memberId }, { "address", ipAddr }, { "last_updated", std::to_string(ts) }, { "os", os }, { "arch", arch } }; + std::unordered_map record = { { "id", memberId }, + { "address", ipAddr }, + { "last_updated", std::to_string(ts) }, + { "os", os }, + { "arch", arch } }; tx.zadd("nodes-online:{" + controllerId + "}", memberId, ts) .zadd("nodes-online2:{" + controllerId + "}", networkId + "-" + memberId, ts) .zadd("network-nodes-online:{" + controllerId + "}:" + networkId, memberId, ts) @@ -2093,16 +2123,24 @@ uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controller // expire records from all-nodes and network-nodes member list uint64_t expireOld = OSUtils::now() - 300000; - tx.zremrangebyscore("nodes-online:{" + controllerId + "}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("nodes-online2:{" + controllerId + "}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("active-networks:{" + controllerId + "}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore( + "nodes-online:{" + controllerId + "}", + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore( + "nodes-online2:{" + controllerId + "}", + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore( + "active-networks:{" + controllerId + "}", + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); { std::shared_lock l(_networks_l); for (const auto& it : _networks) { uint64_t nwid_i = it.first; char nwidTmp[64]; OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - tx.zremrangebyscore("network-nodes-online:{" + controllerId + "}:" + nwidTmp, sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore( + "network-nodes-online:{" + controllerId + "}:" + nwidTmp, + sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); } } tx.exec(); diff --git a/controller/CentralDB.cpp b/controller/CentralDB.cpp index 1b1e4d554..80cfa6982 100644 --- a/controller/CentralDB.cpp +++ b/controller/CentralDB.cpp @@ -184,9 +184,9 @@ CentralDB::CentralDB( case LISTENER_MODE_PUBSUB: if (cc->pubSubConfig != NULL) { _membersDbWatcher = - std::make_shared(_myAddressStr, cc->pubSubConfig->listen_timeout, this); + std::make_shared(_myAddressStr, cc->pubSubConfig->project, this); _networksDbWatcher = - std::make_shared(_myAddressStr, cc->pubSubConfig->listen_timeout, this); + std::make_shared(_myAddressStr, cc->pubSubConfig->project, this); } else { throw std::runtime_error( @@ -1250,7 +1250,6 @@ void CentralDB::commitThread() std::string id = config["id"]; - // network must already exist pqxx::result res = w.exec_params0( "INSERT INTO networks_ctl (id, name, configuration, controller_id, revision) " "VALUES ($1, $2, $3, $4, $5) " @@ -1261,19 +1260,19 @@ void CentralDB::commitThread() w.commit(); - res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0); + // res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0); - auto pool = config["ipAssignmentPools"]; - bool err = false; - for (auto i = pool.begin(); i != pool.end(); ++i) { - std::string start = (*i)["ipRangeStart"]; - std::string end = (*i)["ipRangeEnd"]; + // auto pool = config["ipAssignmentPools"]; + // bool err = false; + // for (auto i = pool.begin(); i != pool.end(); ++i) { + // std::string start = (*i)["ipRangeStart"]; + // std::string end = (*i)["ipRangeEnd"]; - res = w.exec_params0( - "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " - "VALUES ($1, $2, $3)", - id, start, end); - } + // res = w.exec_params0( + // "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " + // "VALUES ($1, $2, $3)", + // id, start, end); + // } const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); if (nwidInt) { diff --git a/controller/PubSubListener.cpp b/controller/PubSubListener.cpp index a53343180..36eb2b14f 100644 --- a/controller/PubSubListener.cpp +++ b/controller/PubSubListener.cpp @@ -2,43 +2,101 @@ #include "PubSubListener.hpp" #include "DB.hpp" +#include "member.pb.h" +#include "network.pb.h" #include "opentelemetry/trace/provider.h" #include "rustybits.h" +#include +#include +#include +#include +#include +#include #include +namespace pubsub = ::google::cloud::pubsub; +namespace pubsub_admin = ::google::cloud::pubsub_admin; + namespace ZeroTier { -void listener_callback(void* user_ptr, const uint8_t* payload, uintptr_t length) +nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc); +nlohmann::json toJson(const pbmessages::MemberChange_Member& mc); + +PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic) + : _controller_id(controller_id) + , _project(project) + , _topic(topic) + , _subscription_id("sub-" + controller_id + "-network-changes") + , _run(false) + , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection()) + , _subscription(pubsub::Subscription(_project, _subscription_id)) { - if (! user_ptr || ! payload || length == 0) { - fprintf(stderr, "Invalid parameters in listener_callback\n"); - return; + GOOGLE_PROTOBUF_VERIFY_VERSION; + + google::pubsub::v1::Subscription request; + request.set_name(_subscription.FullName()); + request.set_topic(pubsub::Topic(project, topic).FullName()); + request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")"); + auto sub = _adminClient.CreateSubscription(request); + if (! sub.ok()) { + fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str()); + throw std::runtime_error("Failed to create subscription"); } - auto* listener = static_cast(user_ptr); - std::string payload_str(reinterpret_cast(payload), length); - listener->onNotification(payload_str); + if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) { + fprintf(stderr, "Subscription already exists\n"); + throw std::runtime_error("Subscription already exists"); + } + + _subscriber = std::make_shared(pubsub::MakeSubscriberConnection(_subscription)); + + _run = true; + _subscriberThread = std::thread(&PubSubListener::subscribe, this); } -PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, uint64_t listen_timeout, DB* db) : _run(true), _controller_id(controller_id), _db(db), _listener(nullptr) +PubSubListener::~PubSubListener() +{ + _run = false; + _session.cancel(); + if (_subscriberThread.joinable()) { + _subscriberThread.join(); + } +} + +void PubSubListener::subscribe() +{ + while (_run) { + _session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) { + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + auto tracer = provider->GetTracer("PubSubListener"); + auto span = tracer->StartSpan("PubSubListener::onMessage"); + auto scope = tracer->WithActiveSpan(span); + span->SetAttribute("message_id", m.message_id()); + span->SetAttribute("ordering_key", m.ordering_key()); + span->SetAttribute("attributes", m.attributes().size()); + + fprintf(stderr, "Received message %s\n", m.message_id().c_str()); + onNotification(m.data()); + std::move(h).ack(); + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + return true; + }); + auto status = _session.get(); + if (! status.ok() && _run) { + fprintf(stderr, "Error during Subscribe: %s\n", status.message().c_str()); + } + } +} + +PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, DB* db) + : PubSubListener(controller_id, project, "controller-network-change-stream") + , _db(db) { - _listener = rustybits::network_listener_new(_controller_id.c_str(), listen_timeout, listener_callback, this); - _listenThread = std::thread(&PubSubNetworkListener::listenThread, this); - _changeHandlerThread = std::thread(&PubSubNetworkListener::changeHandlerThread, this); } PubSubNetworkListener::~PubSubNetworkListener() { - _run = false; - if (_listenThread.joinable()) { - _listenThread.join(); - } - - if (_listener) { - rustybits::network_listener_delete(_listener); - _listener = nullptr; - } } void PubSubNetworkListener::onNotification(const std::string& payload) @@ -47,23 +105,26 @@ void PubSubNetworkListener::onNotification(const std::string& payload) auto tracer = provider->GetTracer("PubSubNetworkListener"); auto span = tracer->StartSpan("PubSubNetworkListener::onNotification"); auto scope = tracer->WithActiveSpan(span); - span->SetAttribute("payload", payload); - fprintf(stderr, "Network notification received: %s\n", payload.c_str()); + pbmessages::NetworkChange nc; + if (! nc.ParseFromString(payload)) { + fprintf(stderr, "Failed to parse NetworkChange protobuf message\n"); + span->SetAttribute("error", "Failed to parse NetworkChange protobuf message"); + span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf"); + return; + } + + fprintf(stderr, "Network notification received"); try { - nlohmann::json j = nlohmann::json::parse(payload); - nlohmann::json& ov_tmp = j["old"]; - nlohmann::json& nv_tmp = j["new"]; nlohmann::json oldConfig, newConfig; - if (ov_tmp.is_object()) { - // TODO: copy old configuration to oldConfig - // changing key names along the way + if (nc.has_old()) { + oldConfig = toJson(nc.old()); } - if (nv_tmp.is_object()) { - // TODO: copy new configuration to newConfig - // changing key names along the way + + if (nc.has_new_()) { + newConfig = toJson(nc.new_()); } if (oldConfig.is_object() && newConfig.is_object()) { @@ -106,41 +167,14 @@ void PubSubNetworkListener::onNotification(const std::string& payload) } } -void PubSubNetworkListener::listenThread() +PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, DB* db) + : PubSubListener(controller_id, project, "controller-member-change-stream") + , _db(db) { - if (_listener) { - while (_run) { - rustybits::network_listener_listen(_listener); - } - } -} - -void PubSubNetworkListener::changeHandlerThread() -{ - if (_listener) { - rustybits::network_listener_change_handler(_listener); - } -} - -PubSubMemberListener::PubSubMemberListener(std::string controller_id, uint64_t listen_timeout, DB* db) : _run(true), _controller_id(controller_id), _db(db), _listener(nullptr) -{ - _run = true; - _listener = rustybits::member_listener_new(_controller_id.c_str(), listen_timeout, listener_callback, this); - _listenThread = std::thread(&PubSubMemberListener::listenThread, this); - _changeHandlerThread = std::thread(&PubSubMemberListener::changeHandlerThread, this); } PubSubMemberListener::~PubSubMemberListener() { - _run = false; - if (_listenThread.joinable()) { - _listenThread.join(); - } - - if (_listener) { - rustybits::member_listener_delete(_listener); - _listener = nullptr; - } } void PubSubMemberListener::onNotification(const std::string& payload) @@ -149,22 +183,27 @@ void PubSubMemberListener::onNotification(const std::string& payload) auto tracer = provider->GetTracer("PubSubMemberListener"); auto span = tracer->StartSpan("PubSubMemberListener::onNotification"); auto scope = tracer->WithActiveSpan(span); - span->SetAttribute("payload", payload); - fprintf(stderr, "Member notification received: %s\n", payload.c_str()); + pbmessages::MemberChange mc; + if (! mc.ParseFromString(payload)) { + fprintf(stderr, "Failed to parse MemberChange protobuf message\n"); + span->SetAttribute("error", "Failed to parse MemberChange protobuf message"); + span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf"); + return; + } + + fprintf(stderr, "Member notification received"); try { nlohmann::json tmp; - nlohmann::json old_tmp = tmp["old"]; - nlohmann::json new_tmp = tmp["new"]; nlohmann::json oldConfig, newConfig; - if (old_tmp.is_object()) { - // TODO: copy old configuration to oldConfig + if (mc.has_old()) { + oldConfig = toJson(mc.old()); } - if (new_tmp.is_object()) { - // TODO: copy new configuration to newConfig + if (mc.has_new_()) { + newConfig = toJson(mc.new_()); } if (oldConfig.is_object() && newConfig.is_object()) { @@ -214,20 +253,143 @@ void PubSubMemberListener::onNotification(const std::string& payload) } } -void PubSubMemberListener::listenThread() +nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc) { - if (_listener) { - while (_run) { - rustybits::member_listener_listen(_listener); + nlohmann::json out; + + out["id"] = nc.network_id(); + out["name"] = nc.name(); + out["capabilities"] = OSUtils::jsonParse(nc.capabilities()); + out["mtu"] = nc.mtu(); + out["multicastLimit"] = nc.multicast_limit(); + out["private"] = nc.is_private(); + out["remoteTraceLevel"] = nc.remote_trace_level(); + if (nc.has_remote_trace_target()) { + out["remoteTraceTarget"] = nc.remote_trace_target(); + } + else { + out["remoteTraceTarget"] = ""; + } + out["rules"] = OSUtils::jsonParse(nc.rules()); + out["rulesSource"] = nc.rules_source(); + out["tags"] = OSUtils::jsonParse(nc.tags()); + + if (nc.has_ipv4_assign_mode()) { + nlohmann::json ipv4mode; + ipv4mode["zt"] = nc.ipv4_assign_mode().zt(); + out["ipv4AssignMode"] = ipv4mode; + } + if (nc.has_ipv6_assign_mode()) { + nlohmann::json ipv6mode; + ipv6mode["6plane"] = nc.ipv6_assign_mode().six_plane(); + ipv6mode["rfc4193"] = nc.ipv6_assign_mode().rfc4193(); + ipv6mode["zt"] = nc.ipv6_assign_mode().zt(); + out["ipv6AssignMode"] = ipv6mode; + } + + if (nc.assignment_pools_size() > 0) { + nlohmann::json pools = nlohmann::json::array(); + for (const auto& p : nc.assignment_pools()) { + nlohmann::json pool; + pool["ipRangeStart"] = p.start_ip(); + pool["ipRangeEnd"] = p.end_ip(); + pools.push_back(pool); + } + out["assignmentPools"] = pools; + } + + if (nc.routes_size() > 0) { + nlohmann::json routes = nlohmann::json::array(); + for (const auto& r : nc.routes()) { + nlohmann::json route; + route["target"] = r.target(); + if (r.has_via()) { + route["via"] = r.via(); + } + routes.push_back(route); + } + out["routes"] = routes; + } + + if (nc.has_dns()) { + nlohmann::json dns; + if (nc.dns().nameservers_size() > 0) { + nlohmann::json servers = nlohmann::json::array(); + for (const auto& s : nc.dns().nameservers()) { + servers.push_back(s); + } + dns["servers"] = servers; + } + dns["domain"] = nc.dns().domain(); + + out["dns"] = dns; + } + + out["ssoEnabled"] = nc.sso_enabled(); + nlohmann::json sso; + if (nc.sso_enabled()) { + sso = nlohmann::json::object(); + if (nc.has_sso_client_id()) { + sso["ssoClientId"] = nc.sso_client_id(); + } + + if (nc.has_sso_authorization_endpoint()) { + sso["ssoAuthorizationEndpoint"] = nc.sso_authorization_endpoint(); + } + + if (nc.has_sso_issuer()) { + sso["ssoIssuer"] = nc.sso_issuer(); + } + + if (nc.has_sso_provider()) { + sso["ssoProvider"] = nc.sso_provider(); } } + out["ssoConfig"] = sso; + + return out; } -void PubSubMemberListener::changeHandlerThread() +nlohmann::json toJson(const pbmessages::MemberChange_Member& mc) { - if (_listener) { - rustybits::member_listener_change_handler(_listener); + nlohmann::json out; + out["id"] = mc.device_id(); + out["nwid"] = mc.network_id(); + if (mc.has_remote_trace_target()) { + out["remoteTraceTarget"] = mc.remote_trace_target(); } + else { + out["remoteTraceTarget"] = ""; + } + out["authorized"] = mc.authorized(); + out["activeBridge"] = mc.active_bridge(); + + auto ipAssignments = mc.ip_assignments(); + if (ipAssignments.size() > 0) { + nlohmann::json assignments = nlohmann::json::array(); + for (const auto& ip : ipAssignments) { + assignments.push_back(ip); + } + out["ipAssignments"] = assignments; + } + + out["noAutoAssignIps"] = mc.no_auto_assign_ips(); + out["ssoExempt"] = mc.sso_exepmt(); + out["authenticationExpiryTime"] = mc.auth_expiry_time(); + out["capabilities"] = OSUtils::jsonParse(mc.capabilities()); + out["creationTime"] = mc.creation_time(); + out["identity"] = mc.identity(); + out["lastAuthorizedTime"] = mc.last_authorized_time(); + out["lastDeauthorizedTime"] = mc.last_deauthorized_time(); + out["remoteTraceLevel"] = mc.remote_trace_level(); + out["revision"] = mc.revision(); + out["tags"] = OSUtils::jsonParse(mc.tags()); + out["versionMajor"] = mc.version_major(); + out["versionMinor"] = mc.version_minor(); + out["versionRev"] = mc.version_rev(); + out["versionProtocol"] = mc.version_protocol(); + + return out; } } // namespace ZeroTier diff --git a/controller/PubSubListener.hpp b/controller/PubSubListener.hpp index 93b9e36d8..10d78ea94 100644 --- a/controller/PubSubListener.hpp +++ b/controller/PubSubListener.hpp @@ -6,6 +6,8 @@ #include "NotificationListener.hpp" #include "rustybits.h" +#include +#include #include #include #include @@ -15,6 +17,8 @@ class DB; struct PubSubConfig { const char* controller_id; + std::string project; + std::string topic; uint64_t listen_timeout; }; @@ -23,11 +27,25 @@ struct PubSubConfig { */ class PubSubListener : public NotificationListener { public: - virtual ~PubSubListener() - { - } + PubSubListener(std::string controller_id, std::string project, std::string topic); + virtual ~PubSubListener(); virtual void onNotification(const std::string& payload) = 0; + + protected: + std::string _controller_id; + std::string _project; + std::string _topic; + std::string _subscription_id; + + private: + void subscribe(); + bool _run = false; + google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient; + google::cloud::pubsub::Subscription _subscription; + std::shared_ptr _subscriber; + google::cloud::future _session; + std::thread _subscriberThread; }; /** @@ -35,21 +53,13 @@ class PubSubListener : public NotificationListener { */ class PubSubNetworkListener : public PubSubListener { public: - PubSubNetworkListener(std::string controller_id, uint64_t listen_timeout, DB* db); + PubSubNetworkListener(std::string controller_id, std::string project, DB* db); virtual ~PubSubNetworkListener(); virtual void onNotification(const std::string& payload) override; private: - void listenThread(); - void changeHandlerThread(); - - bool _run = false; - std::string _controller_id; DB* _db; - const rustybits::NetworkListener* _listener; - std::thread _listenThread; - std::thread _changeHandlerThread; }; /** @@ -57,21 +67,13 @@ class PubSubNetworkListener : public PubSubListener { */ class PubSubMemberListener : public PubSubListener { public: - PubSubMemberListener(std::string controller_id, uint64_t listen_timeout, DB* db); + PubSubMemberListener(std::string controller_id, std::string project, DB* db); virtual ~PubSubMemberListener(); virtual void onNotification(const std::string& payload) override; private: - void listenThread(); - void changeHandlerThread(); - - bool _run = false; - std::string _controller_id; DB* _db; - const rustybits::MemberListener* _listener; - std::thread _listenThread; - std::thread _changeHandlerThread; }; } // namespace ZeroTier diff --git a/controller/protobuf/member.proto b/controller/protobuf/member.proto new file mode 100644 index 000000000..511f1a8d7 --- /dev/null +++ b/controller/protobuf/member.proto @@ -0,0 +1,39 @@ +syntax = "proto3"; + +package pbmessages; + +message MemberChange { + message Member { + string device_id = 1; + string network_id = 2; + string identity = 3; // Identity of the member + bool authorized = 4; // Whether the member is authorized + repeated string ip_assignments = 5; // List of IP assignments + bool active_bridge = 6; // Whether the member is an active bridge + string tags = 7; // JSON string of tags + string capabilities = 8; // JSON string of capabilities + uint64 creation_time = 9; // Unix timestamp in milliseconds + bool no_auto_assign_ips = 10; // Whether auto IP assignment is disabled + uint64 revision = 11; // Revision number + uint64 last_authorized_time = 12; // Last time the member was authorized + uint64 last_deauthorized_time = 13; // Last time the member was deauthorized + optional string last_authorized_credential_type = 14; // Type of credential used for last authorization + optional string last_authorized_credential = 15; // Credential used for last authorization + int32 version_major = 16; // Major version of the member + int32 version_minor = 17; // Minor version of the member + int32 version_rev = 18; // Patch version of the member + int32 version_protocol = 19; // Protocol version of the member + int32 remote_trace_level = 20; // Remote trace level + optional string remote_trace_target = 21; // Remote trace target + bool sso_exepmt = 22; // Whether SSO is exempt + uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds + } + message MemberChangeMetadata { + string trace_id = 1; + string controller_id = 2; + } + + optional Member old = 1; + optional Member new = 2; + optional MemberChangeMetadata metadata = 3; +} diff --git a/controller/protobuf/member_status.proto b/controller/protobuf/member_status.proto new file mode 100644 index 000000000..248390517 --- /dev/null +++ b/controller/protobuf/member_status.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package pbmessages; + + + +message MemberStatus { + message MemberStatusMetadata { + string trace_id = 1; + string controller_id = 2; + } + + MemberStatusMetadata metadata = 1; + string network_id = 2; + string member_id = 3; + uint64 timestamp = 4; // Unix timestamp in milliseconds + optional string ip_address = 5; // Optional IP address of the member + optional string os = 6; + optional string arch = 7; + optional string version = 8; +} diff --git a/controller/protobuf/network.proto b/controller/protobuf/network.proto new file mode 100644 index 000000000..c2b6868f7 --- /dev/null +++ b/controller/protobuf/network.proto @@ -0,0 +1,66 @@ +syntax = "proto3"; + +package pbmessages; + +message NetworkChange { + message NetworkChangeMetadata { + string trace_id = 1; + string controller_id = 2; + } + + message IPRange { + string start_ip = 1; // Start of the IP range + string end_ip = 2; // End of the IP range + } + + message Route { + string target = 1; // Target IP or network + optional string via = 2; // Optional next hop IP + } + + message DNS { + string domain = 1; // Search domain + repeated string nameservers = 2; // List of nameservers + } + + message IPV4AssignMode { + bool zt = 1; // Whether ZeroTier is used for IPv4 assignment + } + + message IPv6AssignMode { + bool six_plane = 1; // Whether 6plane is used for IPv6 assignment + bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment + bool zt = 3; // Whether ZeroTier is used for IPv6 assignment + } + + message Network { + string network_id = 1; + string capabilities = 2; // JSON string of capabilities + uint64 creation_time = 3; // Unix timestamp in milliseconds + bool enable_broadcast = 4; // Whether broadcast is enabled + repeated IPRange assignment_pools = 5; // List of IP ranges for assignment + uint32 mtu = 6; // Maximum Transmission Unit + uint32 multicast_limit = 7; // Limit for multicast messages + optional string name = 8; // Name of the network + bool is_private = 9; // Whether the network is private + uint32 remote_trace_level = 10; // Remote trace level + optional string remote_trace_target = 11; // Remote trace target + uint64 revision = 12; // Revision number + repeated Route routes = 13; // List of routes + string rules = 14; // JSON string of rules + optional string tags = 15; // JSON string of tags + IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode + IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode + optional DNS dns = 18; // DNS configuration + bool sso_enabled = 19; // Whether Single Sign-On is enabled + optional string sso_client_id = 20; // SSO client ID + optional string sso_authorization_endpoint = 21; // SSO authorization endpoint + optional string sso_issuer = 22; // SSO issuer + optional string sso_provider = 23; // SSO provider + string rules_source = 24; // source code for rules + } + + optional Network old = 1; + optional Network new = 2; + optional NetworkChangeMetadata metadata = 3; +}