diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index b8fd749a5..0f5fde77b 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -22,6 +22,7 @@ #include <libpq-fe.h> #include <sstream> +#include <climits> using json = nlohmann::json; @@ -113,11 +114,8 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION); exit(1); } - PQclear(res); res = NULL; - PQfinish(conn); - conn = NULL; if (_rc != NULL) { sw::redis::ConnectionOptions opts; @@ -137,6 +135,16 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R } _readyLock.lock(); + + fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); + _waitNoticePrinted = true; + + initializeNetworks(conn); + initializeMembers(conn); + + PQfinish(conn); + conn = NULL; + _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this); _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this); _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this); @@ -165,10 +173,6 @@ PostgreSQL::~PostgreSQL() bool PostgreSQL::waitForReady() { while (_ready < 2) { - if (!_waitNoticePrinted) { - _waitNoticePrinted = true; - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); - } _readyLock.lock(); _readyLock.unlock(); } @@ -229,12 +233,15 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId) tmp.first["objtype"] = "_delete_network"; tmp.second = true; _commitQueue.post(tmp); + nlohmann::json nullJson; + _networkChanged(tmp.first, nullJson, true); } void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) { char tmp2[24]; - std::pair<nlohmann::json,bool> tmp; + waitForReady(); + std::pair<nlohmann::json,bool> tmp, nw; Utils::hex(networkId, tmp2); tmp.first["nwid"] = tmp2; Utils::hex(memberId, tmp2); @@ -242,6 +249,8 @@ void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) tmp.first["objtype"] = "_delete_member"; tmp.second = true; _commitQueue.post(tmp); + nlohmann::json nullJson; + _memberChanged(tmp.first, nullJson, true); } void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress) @@ -261,11 +270,30 @@ void PostgreSQL::initializeNetworks(PGconn *conn) fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } + + std::string setKey = "networks:{" + _myAddressStr + "}"; + + if (_rc != NULL) { + try { + if (_rc->clusterMode) { + _cluster->del(setKey); + } else { + _redis->del(setKey); + } + } catch (sw::redis::Error &e) { + // del can throw an error if the key doesn't exist + // swallow it and move along + } + } + + std::unordered_set<std::string> networkSet; const char *params[1] = { _myAddressStr.c_str() }; + fprintf(stderr, "Initializing Networks...\n"); + PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " @@ -291,9 +319,12 @@ void PostgreSQL::initializeNetworks(PGconn *conn) const char *nwidparam[1] = { PQgetvalue(res, i, 0) }; + std::string nwid = PQgetvalue(res, i, 0); + + networkSet.insert(nwid); - config["id"] = PQgetvalue(res, i, 0); - config["nwid"] = PQgetvalue(res, i, 0); + config["id"] = nwid; + config["nwid"] = nwid; try { config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); } catch (std::exception &e) { @@ -406,14 +437,29 @@ void PostgreSQL::initializeNetworks(PGconn *conn) PQclear(res); + if(!networkSet.empty()) { + if (_rc && _rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + tx.sadd(setKey, networkSet.begin(), networkSet.end()); + tx.exec(); + } else if (_rc && !_rc->clusterMode) { + auto tx = _redis->transaction(true); + tx.sadd(setKey, networkSet.begin(), networkSet.end()); + tx.exec(); + } + } + if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); } _readyLock.unlock(); } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what()); + exit(-1); } catch (std::exception &e) { - fprintf(stderr, "ERROR: Error initializing networks: %s", e.what()); + fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what()); exit(-1); } } @@ -425,11 +471,44 @@ void PostgreSQL::initializeMembers(PGconn *conn) fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } + std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; + + if (_rc != NULL) { + std::lock_guard<std::mutex> l(_networks_l); + std::unordered_set<std::string> deletes; + for ( auto it : _networks) { + uint64_t nwid_i = it.first; + char nwidTmp[64] = {0}; + OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + std::string nwid(nwidTmp); + std::string key = setKeyBase + nwid; + deletes.insert(key); + } + if (!deletes.empty()) { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + for (std::string k : deletes) { + tx.del(k); + } + tx.exec(); + } else { + auto tx = _redis->transaction(true); + for (std::string k : deletes) { + tx.del(k); + } + tx.exec(); + } + } + } + const char *params[1] = { _myAddressStr.c_str() }; + + std::unordered_map<std::string, std::string> networkMembers; + fprintf(stderr, "Initializing Members...\n"); PGresult *res = PQexecParams(conn, "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " @@ -460,6 +539,9 @@ void PostgreSQL::initializeMembers(PGconn *conn) std::string memberId(PQgetvalue(res, i, 0)); std::string networkId(PQgetvalue(res, i, 1)); + + networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId)); + std::string ctime = PQgetvalue(res, i, 5); config["id"] = memberId; config["nwid"] = networkId; @@ -565,12 +647,31 @@ void PostgreSQL::initializeMembers(PGconn *conn) PQclear(res); + if (!networkMembers.empty()) { + if (_rc != NULL) { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + for (auto it : networkMembers) { + tx.sadd(it.first, it.second); + } + tx.exec(); + } else { + auto tx = _redis->transaction(true); + for (auto it : networkMembers) { + tx.sadd(it.first, it.second); + } + tx.exec(); + } + } + } if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); } _readyLock.unlock(); } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error initializing members (redis): %s\n", e.what()); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what()); exit(-1); @@ -608,12 +709,13 @@ void PostgreSQL::heartbeat() PQfinish(conn); exit(6); } + int64_t ts = OSUtils::now(); if (conn) { std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR); std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR); std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION); std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); - std::string now = std::to_string(OSUtils::now()); + std::string now = std::to_string(ts); std::string host_port = std::to_string(_listenPort); std::string use_redis = (_rc != NULL) ? "true" : "false"; const char *values[10] = { @@ -630,7 +732,7 @@ void PostgreSQL::heartbeat() }; PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port,use_redis) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) " "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " @@ -648,6 +750,13 @@ void PostgreSQL::heartbeat() } PQclear(res); } + if (_rc != NULL) { + if (_rc->clusterMode) { + _cluster->zadd("controllers", controllerId, ts); + } else { + _redis->zadd("controllers", controllerId, ts); + } + } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } @@ -666,8 +775,6 @@ void PostgreSQL::membersDbWatcher() exit(1); } - initializeMembers(conn); - if (_rc) { PQfinish(conn); conn = NULL; @@ -731,50 +838,54 @@ void PostgreSQL::_membersWatcher_Redis() { std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; while (_run == 1) { - json tmp; - std::unordered_map<std::string, ItemStream> result; - if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } - if (!result.empty()) { - for (auto element : result) { -#ifdef ZT_TRACE - fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); -#endif - for (auto rec : element.second) { - std::string id = rec.first; - auto attrs = rec.second; -#ifdef ZT_TRACE - fprintf(stdout, "Record ID: %s\n", id.c_str()); - fprintf(stdout, "attrs len: %lu\n", attrs.size()); -#endif - for (auto a : attrs) { -#ifdef ZT_TRACE - fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); -#endif - try { - tmp = json::parse(a.second); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object()||newConfig.is_object()) { - _memberChanged(oldConfig,newConfig,(this->_ready >= 2)); + try { + json tmp; + std::unordered_map<std::string, ItemStream> result; + if (_rc->clusterMode) { + _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } else { + _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } + if (!result.empty()) { + for (auto element : result) { + #ifdef ZT_TRACE + fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); + #endif + for (auto rec : element.second) { + std::string id = rec.first; + auto attrs = rec.second; + #ifdef ZT_TRACE + fprintf(stdout, "Record ID: %s\n", id.c_str()); + fprintf(stdout, "attrs len: %lu\n", attrs.size()); + #endif + for (auto a : attrs) { + #ifdef ZT_TRACE + fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); + #endif + try { + tmp = json::parse(a.second); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) { + fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } catch (...) { - fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } - if (_rc->clusterMode) { - _cluster->xdel(key, id); - } else { - _redis->xdel(key, id); + if (_rc->clusterMode) { + _cluster->xdel(key, id); + } else { + _redis->xdel(key, id); + } } } } + } catch (sw::redis::Error &e) { + fprintf(stderr, "Error in Redis members watcher: %s\n", e.what()); } } fprintf(stderr, "membersWatcher ended\n"); @@ -789,8 +900,6 @@ void PostgreSQL::networksDbWatcher() exit(1); } - initializeNetworks(conn); - if (_rc) { PQfinish(conn); conn = NULL; @@ -852,51 +961,55 @@ void PostgreSQL::_networksWatcher_Redis() { std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}"; while (_run == 1) { - json tmp; - std::unordered_map<std::string, ItemStream> result; - if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } - - if (!result.empty()) { - for (auto element : result) { + try { + json tmp; + std::unordered_map<std::string, ItemStream> result; + if (_rc->clusterMode) { + _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } else { + _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } + + if (!result.empty()) { + for (auto element : result) { #ifdef ZT_TRACE - fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); + fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); #endif - for (auto rec : element.second) { - std::string id = rec.first; - auto attrs = rec.second; + for (auto rec : element.second) { + std::string id = rec.first; + auto attrs = rec.second; #ifdef ZT_TRACE - fprintf(stdout, "Record ID: %s\n", id.c_str()); - fprintf(stdout, "attrs len: %lu\n", attrs.size()); + fprintf(stdout, "Record ID: %s\n", id.c_str()); + fprintf(stdout, "attrs len: %lu\n", attrs.size()); #endif - for (auto a : attrs) { + for (auto a : attrs) { #ifdef ZT_TRACE - fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); + fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); #endif - try { - tmp = json::parse(a.second); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object()||newConfig.is_object()) { - _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + try { + tmp = json::parse(a.second); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) { + fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } catch (...) { - fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } - if (_rc->clusterMode) { - _cluster->xdel(key, id); - } else { - _redis->xdel(key, id); + if (_rc->clusterMode) { + _cluster->xdel(key, id); + } else { + _redis->xdel(key, id); + } } } } + } catch (sw::redis::Error &e) { + fprintf(stderr, "Error in Redis networks watcher: %s\n", e.what()); } } fprintf(stderr, "networksWatcher ended\n"); @@ -1332,6 +1445,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } + if (_rc != NULL) { + try { + std::string id = (*config)["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->sadd(key, id); + } else { + _redis->sadd(key, id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; @@ -1355,6 +1482,22 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } + if (_rc != NULL) { + try { + std::string id = (*config)["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->srem(key, id); + _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); + } else { + _redis->srem(key, id); + _redis->del("network-nodes-online:{"+controllerId+"}:"+id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_member") { try { std::string memberId = (*config)["id"]; @@ -1382,6 +1525,23 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } + if (_rc != NULL) { + try { + std::string memberId = (*config)["id"]; + std::string networkId = (*config)["nwid"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + if (_rc->clusterMode) { + _cluster->srem(key, memberId); + _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + } else { + _redis->srem(key, memberId); + _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + } + } } else { fprintf(stderr, "ERROR: unknown objtype"); } @@ -1401,6 +1561,17 @@ void PostgreSQL::commitThread() } void PostgreSQL::onlineNotificationThread() +{ + waitForReady(); + + if (_rc != NULL) { + onlineNotification_Redis(); + } else { + onlineNotification_Postgres(); + } +} + +void PostgreSQL::onlineNotification_Postgres() { PGconn *conn = getPgConn(); if (PQstatus(conn) == CONNECTION_BAD) { @@ -1410,9 +1581,7 @@ void PostgreSQL::onlineNotificationThread() } _connected = 1; - //int64_t lastUpdatedNetworkStatus = 0; - std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; - + nlohmann::json jtmp1, jtmp2; while (_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); @@ -1420,9 +1589,6 @@ void PostgreSQL::onlineNotificationThread() exit(5); } - // map used to send notifications to front end - std::unordered_map<std::string, std::vector<std::string>> updateMap; - std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; { std::lock_guard<std::mutex> l(_lastOnline_l); @@ -1443,20 +1609,13 @@ void PostgreSQL::onlineNotificationThread() OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - auto found = _networks.find(nwid_i); - if (found == _networks.end()) { - continue; // skip members trying to join non-existant networks + if(!get(nwid_i, jtmp1, i->first.second, jtmp2)) { + continue; // skip non existent networks/members } std::string networkId(nwidTmp); std::string memberId(memTmp); - std::vector<std::string> &members = updateMap[networkId]; - members.push_back(memberId); - - lastOnlineCumulative[i->first] = i->second.first; - - const char *qvals[2] = { networkId.c_str(), memberId.c_str() @@ -1516,7 +1675,7 @@ void PostgreSQL::onlineNotificationThread() PQclear(res); } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); PQfinish(conn); @@ -1526,6 +1685,95 @@ void PostgreSQL::onlineNotificationThread() } } +void PostgreSQL::onlineNotification_Redis() +{ + _connected = 1; + + char buf[11] = {0}; + std::string controllerId = std::string(_myAddress.toString(buf)); + + while (_run == 1) { + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; + { + std::lock_guard<std::mutex> l(_lastOnline_l); + lastOnline.swap(_lastOnline); + } + try { + if (!lastOnline.empty()) { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(controllerId, true); + _doRedisUpdate(tx, controllerId, lastOnline); + } else { + auto tx = _redis->transaction(true); + _doRedisUpdate(tx, controllerId, lastOnline); + } + } + } catch (sw::redis::Error &e) { +#ifdef ZT_TRACE + fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); +#endif + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } +} + +void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId, + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline) + +{ + nlohmann::json jtmp1, jtmp2; + for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { + uint64_t nwid_i = i->first.first; + uint64_t memberid_i = i->first.second; + char nwidTmp[64]; + char memTmp[64]; + char ipTmp[64]; + OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); + OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i); + + if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){ + continue; // skip non existent members/networks + } + + std::string networkId(nwidTmp); + std::string memberId(memTmp); + + int64_t ts = i->second.first; + std::string ipAddr = i->second.second.toIpString(ipTmp); + std::string timestamp = std::to_string(ts); + + std::unordered_map<std::string, std::string> record = { + {"id", memberId}, + {"address", ipAddr}, + {"last_updated", std::to_string(ts)} + }; + tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts) + .zadd("nodes-online2:{"+controllerId+"}", networkId+"-"+memberId, ts) + .zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts) + .zadd("active-networks:{"+controllerId+"}", networkId, ts) + .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId) + .hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end()); + } + + // expire records from all-nodes and network-nodes member list + uint64_t expireOld = OSUtils::now() - 300000; + + tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("active-networks:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN)); + { + std::lock_guard<std::mutex> l(_networks_l); + for (const auto &it : _networks) { + uint64_t nwid_i = it.first; + char nwidTmp[64]; + OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); + tx.zremrangebyscore("network-nodes-online:{"+controllerId+"}:"+nwidTmp, + sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN)); + } + } + tx.exec(); +} + PGconn *PostgreSQL::getPgConn(OverrideMode m) { if (m == ALLOW_PGBOUNCER_OVERRIDE) { diff --git a/ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a b/ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a new file mode 100644 index 000000000..af027a63e Binary files /dev/null and b/ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a differ