diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index d625da829..2ccc16b8c 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -1447,7 +1447,9 @@ void EmbeddedNetworkController::_request( Utils::scopy(nc->centralAuthURL, sizeof(nc->centralAuthURL), info.centralAuthURL.c_str()); } if (!info.issuerURL.empty()) { + #ifdef ZT_DEBUG fprintf(stderr, "copying issuerURL to nc: %s\n", info.issuerURL.c_str()); + #endif Utils::scopy(nc->issuerURL, sizeof(nc->issuerURL), info.issuerURL.c_str()); } if (!info.ssoNonce.empty()) { diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 23064c596..0d3c61697 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -170,6 +170,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R , _rc(rc) , _redis(NULL) , _cluster(NULL) + , _redisMemberStatus(false) { char myAddress[64]; _myAddressStr = myId.address().toString(myAddress); @@ -189,6 +190,11 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R // it will be padded at the end with zeroes. If longer, it'll be truncated. Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk)); } + const char *redisMemberStatus = getenv("ZT_REDIS_MEMBER_STATUS"); + if (redisMemberStatus && (strcmp(redisMemberStatus, "true") == 0)) { + _redisMemberStatus = true; + fprintf(stderr, "Using redis for member status\n"); + } auto c = _pool->borrow(); pqxx::work txn{*c->c}; @@ -477,6 +483,7 @@ AuthInfo PostgreSQL::getSSOAuthInfo(const nlohmann::json &member, const std::str info.ssoNonce = nonce; info.ssoState = std::string(state_hex) + "_" +networkId; info.centralAuthURL = redirectURL; +#ifdef ZT_DEBUG fprintf( stderr, "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\n", @@ -485,6 +492,7 @@ AuthInfo PostgreSQL::getSSOAuthInfo(const nlohmann::json &member, const std::str info.ssoNonce.c_str(), info.ssoState.c_str(), info.centralAuthURL.c_str()); +#endif } } else { fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str()); @@ -506,6 +514,21 @@ void PostgreSQL::initializeNetworks() fprintf(stderr, "Initializing Networks...\n"); + if (_redisMemberStatus) { + fprintf(stderr, "Init Redis for networks...\n"); + try { + if (_rc->clusterMode) { + _cluster->del(setKey); + } else { + _redis->del(setKey); + } + } catch (sw::redis::Error &e) { + // ignore. if this key doesn't exist, there's no reason to delete it + } + } + + std::unordered_set networkSet; + char qbuf[2048] = {0}; sprintf(qbuf, "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, " "n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, " @@ -522,7 +545,8 @@ void PostgreSQL::initializeNetworks() auto c = _pool->borrow(); auto c2 = _pool->borrow(); pqxx::work w{*c->c}; - + + fprintf(stderr, "Load networks from psql...\n"); auto stream = pqxx::stream_from::query(w, qbuf); std::tuple< @@ -610,6 +634,8 @@ void PostgreSQL::initializeNetworks() config["clientId"] = clientId.value_or(""); config["authorizationEndpoint"] = authorizationEndpoint.value_or(""); + networkSet.insert(nwid); + if (dnsDomain.has_value()) { std::string serverList = dnsServers.value(); json obj; @@ -673,6 +699,23 @@ void PostgreSQL::initializeNetworks() w.commit(); _pool->unborrow(c2); _pool->unborrow(c); + fprintf(stderr, "done.\n"); + + if (!networkSet.empty()) { + if (_redisMemberStatus) { + fprintf(stderr, "adding networks to redis...\n"); + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + tx.sadd(setKey, networkSet.begin(), networkSet.end()); + tx.exec(); + } else { + auto tx = _redis->transaction(true); + tx.sadd(setKey, networkSet.begin(), networkSet.end()); + tx.exec(); + } + fprintf(stderr, "done.\n"); + } + } if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -680,11 +723,14 @@ void PostgreSQL::initializeNetworks() } _readyLock.unlock(); } + fprintf(stderr, "network init done.\n"); } catch (sw::redis::Error &e) { fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what()); + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); exit(-1); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what()); + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); exit(-1); } } @@ -697,6 +743,42 @@ void PostgreSQL::initializeMembers() std::unordered_map networkMembers; fprintf(stderr, "Initializing Members...\n"); + std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; + + if (_redisMemberStatus) { + fprintf(stderr, "Initialize Redis for members...\n"); + std::lock_guard l(_networks_l); + std::unordered_set deletes; + for ( auto it : _networks) { + uint64_t nwid_i = it.first; + char nwidTmp[64] = {0}; + OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + std::string nwid(nwidTmp); + std::string key = setKeyBase + nwid; + deletes.insert(key); + } + + if (!deletes.empty()) { + try { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + for (std::string k : deletes) { + tx.del(k); + } + tx.exec(); + } else { + auto tx = _redis->transaction(true); + for (std::string k : deletes) { + tx.del(k); + } + tx.exec(); + } + } catch (sw::redis::Error &e) { + // ignore + } + } + } + char qbuf[2048]; sprintf(qbuf, "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, " " (EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, " @@ -717,7 +799,8 @@ void PostgreSQL::initializeMembers() auto c = _pool->borrow(); auto c2 = _pool->borrow(); pqxx::work w{*c->c}; - + + fprintf(stderr, "Load members from psql...\n"); auto stream = pqxx::stream_from::query(w, qbuf); std::tuple< @@ -776,6 +859,8 @@ void PostgreSQL::initializeMembers() std::optional authenticationExpiryTime = std::get<19>(row); std::string assignedAddresses = std::get<20>(row); + networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); + config["id"] = memberId; config["address"] = memberId; config["nwid"] = networkId; @@ -830,6 +915,29 @@ void PostgreSQL::initializeMembers() w.commit(); _pool->unborrow(c2); _pool->unborrow(c); + fprintf(stderr, "done.\n"); + + if (!networkMembers.empty()) { + if (_redisMemberStatus) { + fprintf(stderr, "Load member data into redis...\n"); + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + for (auto it : networkMembers) { + tx.sadd(it.first, it.second); + } + tx.exec(); + } else { + auto tx = _redis->transaction(true); + for (auto it : networkMembers) { + tx.sadd(it.first, it.second); + } + tx.exec(); + } + fprintf(stderr, "done.\n"); + } + } + + fprintf(stderr, "Done loading members...\n"); if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -879,15 +987,16 @@ void PostgreSQL::heartbeat() std::string now = std::to_string(ts); std::string host_port = std::to_string(_listenPort); std::string use_redis = (_rc != NULL) ? "true" : "false"; + std::string redis_mem_status = (_redisMemberStatus) ? "true" : "false"; try { - pqxx::result res = w.exec0("INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) " + pqxx::result res = w.exec0("INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis, redis_member_status) " "VALUES ("+w.quote(controllerId)+", "+w.quote(hostname)+", TO_TIMESTAMP("+now+"::double precision/1000), "+ - w.quote(publicIdentity)+", "+major+", "+minor+", "+rev+", "+build+", "+host_port+", "+use_redis+") " + w.quote(publicIdentity)+", "+major+", "+minor+", "+rev+", "+build+", "+host_port+", "+use_redis+", "+redis_mem_status+") " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " - "use_redis = EXCLUDED.use_redis"); + "use_redis = EXCLUDED.use_redis, redis_member_status = EXCLUDED.redis_member_status"); } catch (std::exception &e) { fprintf(stderr, "Heartbeat update failed: %s\n", e.what()); w.abort(); @@ -899,6 +1008,14 @@ void PostgreSQL::heartbeat() } _pool->unborrow(c); + if (_redisMemberStatus) { + if (_rc->clusterMode) { + _cluster->zadd("controllers", "controllerId", ts); + } else { + _redis->zadd("controllers", "controllerId", ts); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } fprintf(stderr, "Exited heartbeat thread\n"); @@ -1241,7 +1358,7 @@ void PostgreSQL::commitThread() "$1, TO_TIMESTAMP($5::double precision/1000), " "(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1)," "$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), " - "$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, 17) " + "$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) " "ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, " "capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, " "last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, " @@ -1347,6 +1464,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what()); } + if (_redisMemberStatus) { + try { + std::string id = config["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->sadd(key, id); + } else { + _redis->sadd(key, id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_network") { // fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str()); try { @@ -1361,6 +1492,22 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what()); } + if (_redisMemberStatus) { + try { + std::string id = config["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->srem(key, id); + _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); + } else { + _redis->srem(key, id); + _redis->del("network-nodes-online:{"+controllerId+"}:"+id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_member") { // fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str()); @@ -1378,6 +1525,23 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what()); } + if (_redisMemberStatus) { + try { + std::string memberId = config["id"]; + std::string networkId = config["nwid"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + if (_rc->clusterMode) { + _cluster->srem(key, memberId); + _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + } else { + _redis->srem(key, memberId); + _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + } + } } else { fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str()); } @@ -1394,10 +1558,23 @@ void PostgreSQL::commitThread() void PostgreSQL::onlineNotificationThread() { - waitForReady(); - onlineNotification_Postgres(); + waitForReady(); + if (_redisMemberStatus) { + onlineNotification_Redis(); + } else { + onlineNotification_Postgres(); + } } +/** + * ONLY UNCOMMENT FOR TEMPORARY DB MAINTENANCE + * + * This define temproarly turns off writing to the member status table + * so it can be reindexed when the indexes get too large. + */ + +// #define DISABLE_MEMBER_STATUS 1 + void PostgreSQL::onlineNotification_Postgres() { _connected = 1; @@ -1413,7 +1590,8 @@ void PostgreSQL::onlineNotification_Postgres() std::lock_guard l(_lastOnline_l); lastOnline.swap(_lastOnline); } - + +#ifndef DISABLE_MEMBER_STATUS pqxx::work w(*c->c); pqxx::work w2(*c2->c); @@ -1472,6 +1650,7 @@ void PostgreSQL::onlineNotification_Postgres() pipe.complete(); w.commit(); fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount); +#endif } catch (std::exception &e) { fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); } @@ -1515,9 +1694,7 @@ void PostgreSQL::onlineNotification_Redis() } } } catch (sw::redis::Error &e) { -#ifdef REDIS_TRACE fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); -#endif } std::this_thread::sleep_for(std::chrono::seconds(10)); } @@ -1528,6 +1705,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control { nlohmann::json jtmp1, jtmp2; + uint64_t count = 0; for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { uint64_t nwid_i = i->first.first; uint64_t memberid_i = i->first.second; @@ -1559,14 +1737,21 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control .zadd("active-networks:{"+controllerId+"}", networkId, ts) .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId) .hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end()); + ++count; } // expire records from all-nodes and network-nodes member list uint64_t expireOld = OSUtils::now() - 300000; - tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("active-networks:{"+controllerId+"}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("nodes-online:{"+controllerId+"}", + sw::redis::RightBoundedInterval(expireOld, + sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", + sw::redis::RightBoundedInterval(expireOld, + sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("active-networks:{"+controllerId+"}", + sw::redis::RightBoundedInterval(expireOld, + sw::redis::BoundType::LEFT_OPEN)); { std::lock_guard l(_networks_l); for (const auto &it : _networks) { @@ -1578,6 +1763,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control } } tx.exec(); + fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count); } diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index acbc65a67..dfda7f698 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -174,6 +174,7 @@ private: RedisConfig *_rc; std::shared_ptr _redis; std::shared_ptr _cluster; + bool _redisMemberStatus; }; } // namespace ZeroTier diff --git a/make-linux.mk b/make-linux.mk index e902659c1..145bebe52 100644 --- a/make-linux.mk +++ b/make-linux.mk @@ -498,17 +498,15 @@ snap-uninstall: FORCE snap remove zerotier snap-build-remote: FORCE - snapcraft remote-build --build-on=amd64,arm64,s390x,ppc64el,armhf,i386 + cd pkg && snapcraft remote-build --build-on=amd64,arm64,s390x,ppc64el,armhf,i386 snap-upload-beta: FORCE + snapcraft login --with-file=snapcraft-login-data + pushd pkg for SNAPFILE in ./*.snap; do\ - snapcraft upload --release=beta,edge,candidate $${SNAPFILE};\ - done - -snap-upload-stable: FORCE - for SNAPFILE in ./*.snap; do\ - snapcraft upload --release=stable $${SNAPFILE};\ + snapcraft upload --release=stable,beta,edge,candidate $${SNAPFILE};\ done + popd synology-pkg: FORCE cd pkg/synology ; ./build.sh build diff --git a/pkg/snap/snapcraft.yaml b/pkg/snap/snapcraft.yaml index 91bd3da74..57a2ed8e1 100644 --- a/pkg/snap/snapcraft.yaml +++ b/pkg/snap/snapcraft.yaml @@ -64,12 +64,14 @@ layout: parts: one: - source: ./ - plugin: dump - organize: - zerotier-one : usr/sbin/zerotier-one - zerotier-cli : usr/sbin/zerotier-cli - zerotier-idtool : usr/sbin/zerotier-idtool + plugin: make + source: https://github.com/zerotier/zerotierone.git + source-branch: "master" + build-packages: + - build-essential + - libc++-dev + make-parameters: + - ZT_SSO_SUPPORTED=0 filesets: binaries: - usr/sbin/zerotier-one