diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 05d2de7b1..b8fd749a5 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -229,14 +229,12 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId) tmp.first["objtype"] = "_delete_network"; tmp.second = true; _commitQueue.post(tmp); - nlohmann::json nullJson; - _networkChanged(tmp.first, nullJson, true); } void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) { char tmp2[24]; - std::pair tmp, nw; + std::pair tmp; Utils::hex(networkId, tmp2); tmp.first["nwid"] = tmp2; Utils::hex(memberId, tmp2); @@ -244,8 +242,6 @@ void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) tmp.first["objtype"] = "_delete_member"; tmp.second = true; _commitQueue.post(tmp); - nlohmann::json nullJson; - _memberChanged(tmp.first, nullJson, true); } void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress) @@ -634,7 +630,7 @@ void PostgreSQL::heartbeat() }; PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port,use_redis) " "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " @@ -1405,15 +1401,6 @@ void PostgreSQL::commitThread() } void PostgreSQL::onlineNotificationThread() -{ - if (_rc != NULL) { - onlineNotification_Redis(); - } else { - onlineNotification_Postgres(); - } -} - -void PostgreSQL::onlineNotification_Postgres() { PGconn *conn = getPgConn(); if (PQstatus(conn) == CONNECTION_BAD) { @@ -1423,7 +1410,9 @@ void PostgreSQL::onlineNotification_Postgres() } _connected = 1; - nlohmann::json jtmp1, jtmp2; + //int64_t lastUpdatedNetworkStatus = 0; + std::unordered_map< std::pair,int64_t,_PairHasher > lastOnlineCumulative; + while (_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); @@ -1431,6 +1420,9 @@ void PostgreSQL::onlineNotification_Postgres() exit(5); } + // map used to send notifications to front end + std::unordered_map> updateMap; + std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; { std::lock_guard l(_lastOnline_l); @@ -1451,13 +1443,20 @@ void PostgreSQL::onlineNotification_Postgres() OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - if(!get(nwid_i, jtmp1, i->first.second, jtmp2)) { - continue; // skip non existent networks/members + auto found = _networks.find(nwid_i); + if (found == _networks.end()) { + continue; // skip members trying to join non-existant networks } std::string networkId(nwidTmp); std::string memberId(memTmp); + std::vector &members = updateMap[networkId]; + members.push_back(memberId); + + lastOnlineCumulative[i->first] = i->second.first; + + const char *qvals[2] = { networkId.c_str(), memberId.c_str() @@ -1527,107 +1526,6 @@ void PostgreSQL::onlineNotification_Postgres() } } -void PostgreSQL::onlineNotification_Redis() -{ - _connected = 1; - - char buf[11] = {0}; - std::string controllerId = std::string(_myAddress.toString(buf)); - - while (_run == 1) { - std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; - { - std::lock_guard l(_lastOnline_l); - lastOnline.swap(_lastOnline); - } - - if (_rc->clusterMode) { - auto tx = _cluster->redis(controllerId).transaction(true); - _doRedisUpdate(tx, controllerId, lastOnline); - } else { - auto tx = _redis->transaction(true); - _doRedisUpdate(tx, controllerId, lastOnline); - } - - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } -} - -void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId, - std::unordered_map< std::pair,std::pair,_PairHasher > &lastOnline) - -{ - nlohmann::json jtmp1, jtmp2; - for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { - uint64_t nwid_i = i->first.first; - uint64_t memberid_i = i->first.second; - char nwidTmp[64]; - char memTmp[64]; - char ipTmp[64]; - OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); - OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i); - - if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){ - continue; // skip non existent members/networks - } - auto found = _networks.find(nwid_i); - if (found == _networks.end()) { - continue; // skip members trying to join non-existant networks - } - - std::string networkId(nwidTmp); - std::string memberId(memTmp); - - int64_t ts = i->second.first; - std::string ipAddr = i->second.second.toIpString(ipTmp); - std::string timestamp = std::to_string(ts); - - std::unordered_map record = { - {"id", memberId}, - {"address", ipAddr}, - {"last_updated", std::to_string(ts)} - }; - tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts) - .zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts) - .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId) - .hmset("network:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end()); - } - - tx.exec(); - - // expire records from all-nodes and network-nodes member list - uint64_t expireOld = OSUtils::now() - 300000; - - auto cursor = 0LL; - std::unordered_set keys; - // can't scan for keys in a transaction, so we need to fall back to _cluster or _redis - // to get all network-members keys - if(_rc->clusterMode) { - auto r = _cluster->redis(controllerId); - while(true) { - cursor = r.scan(cursor, "network-nodes-online:{"+controllerId+"}:*", INT_MAX, std::inserter(keys, keys.begin())); - if (cursor == 0) { - break; - } - } - } else { - while(true) { - cursor = _redis->scan(cursor, "network-nodes-online:"+controllerId+":*", INT_MAX, std::inserter(keys, keys.begin())); - if (cursor == 0) { - break; - } - } - } - - tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - - for(const auto &k : keys) { - tx.zremrangebyscore(k, sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - } - - tx.exec(); -} - PGconn *PostgreSQL::getPgConn(OverrideMode m) { if (m == ALLOW_PGBOUNCER_OVERRIDE) { diff --git a/node/NetworkConfig.cpp b/node/NetworkConfig.cpp index e45a111d2..97985c7af 100644 --- a/node/NetworkConfig.cpp +++ b/node/NetworkConfig.cpp @@ -22,7 +22,7 @@ namespace ZeroTier { bool NetworkConfig::toDictionary(Dictionary &d,bool includeLegacy) const { Buffer *tmp = new Buffer(); - char tmp2[128]; + char tmp2[128] = {0}; try { d.clear(); @@ -84,7 +84,7 @@ bool NetworkConfig::toDictionary(Dictionary &d,b if (((int)lastrt < 32)||(lastrt == ZT_NETWORK_RULE_MATCH_ETHERTYPE)) { if (ets.length() > 0) ets.push_back(','); - char tmp2[16]; + char tmp2[16] = {0}; ets.append(Utils::hex((uint16_t)et,tmp2)); } et = 0; @@ -104,7 +104,7 @@ bool NetworkConfig::toDictionary(Dictionary &d,b if ((this->specialists[i] & ZT_NETWORKCONFIG_SPECIALIST_TYPE_ACTIVE_BRIDGE) != 0) { if (ab.length() > 0) ab.push_back(','); - char tmp2[16]; + char tmp2[16] = {0}; ab.append(Address(this->specialists[i]).toString(tmp2)); } } @@ -220,7 +220,7 @@ bool NetworkConfig::fromDictionary(const Dictionary