Merge changes from dev into multipath

This commit is contained in:
Joseph Henry 2020-05-26 17:57:09 -07:00
parent 58d567c331
commit 7ed960297b
3 changed files with 28 additions and 123 deletions

View file

@ -229,14 +229,12 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
tmp.first["objtype"] = "_delete_network";
tmp.second = true;
_commitQueue.post(tmp);
nlohmann::json nullJson;
_networkChanged(tmp.first, nullJson, true);
}
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
{
char tmp2[24];
std::pair<nlohmann::json,bool> tmp, nw;
std::pair<nlohmann::json,bool> tmp;
Utils::hex(networkId, tmp2);
tmp.first["nwid"] = tmp2;
Utils::hex(memberId, tmp2);
@ -244,8 +242,6 @@ void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
tmp.first["objtype"] = "_delete_member";
tmp.second = true;
_commitQueue.post(tmp);
nlohmann::json nullJson;
_memberChanged(tmp.first, nullJson, true);
}
void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
@ -634,7 +630,7 @@ void PostgreSQL::heartbeat()
};
PGresult *res = PQexecParams(conn,
"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) "
"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port,use_redis) "
"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
@ -1405,15 +1401,6 @@ void PostgreSQL::commitThread()
}
void PostgreSQL::onlineNotificationThread()
{
if (_rc != NULL) {
onlineNotification_Redis();
} else {
onlineNotification_Postgres();
}
}
void PostgreSQL::onlineNotification_Postgres()
{
PGconn *conn = getPgConn();
if (PQstatus(conn) == CONNECTION_BAD) {
@ -1423,7 +1410,9 @@ void PostgreSQL::onlineNotification_Postgres()
}
_connected = 1;
nlohmann::json jtmp1, jtmp2;
//int64_t lastUpdatedNetworkStatus = 0;
std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
while (_run == 1) {
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
@ -1431,6 +1420,9 @@ void PostgreSQL::onlineNotification_Postgres()
exit(5);
}
// map used to send notifications to front end
std::unordered_map<std::string, std::vector<std::string>> updateMap;
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
{
std::lock_guard<std::mutex> l(_lastOnline_l);
@ -1451,13 +1443,20 @@ void PostgreSQL::onlineNotification_Postgres()
OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
if(!get(nwid_i, jtmp1, i->first.second, jtmp2)) {
continue; // skip non existent networks/members
auto found = _networks.find(nwid_i);
if (found == _networks.end()) {
continue; // skip members trying to join non-existant networks
}
std::string networkId(nwidTmp);
std::string memberId(memTmp);
std::vector<std::string> &members = updateMap[networkId];
members.push_back(memberId);
lastOnlineCumulative[i->first] = i->second.first;
const char *qvals[2] = {
networkId.c_str(),
memberId.c_str()
@ -1527,107 +1526,6 @@ void PostgreSQL::onlineNotification_Postgres()
}
}
void PostgreSQL::onlineNotification_Redis()
{
_connected = 1;
char buf[11] = {0};
std::string controllerId = std::string(_myAddress.toString(buf));
while (_run == 1) {
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
{
std::lock_guard<std::mutex> l(_lastOnline_l);
lastOnline.swap(_lastOnline);
}
if (_rc->clusterMode) {
auto tx = _cluster->redis(controllerId).transaction(true);
_doRedisUpdate(tx, controllerId, lastOnline);
} else {
auto tx = _redis->transaction(true);
_doRedisUpdate(tx, controllerId, lastOnline);
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId,
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline)
{
nlohmann::json jtmp1, jtmp2;
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
uint64_t nwid_i = i->first.first;
uint64_t memberid_i = i->first.second;
char nwidTmp[64];
char memTmp[64];
char ipTmp[64];
OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i);
if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){
continue; // skip non existent members/networks
}
auto found = _networks.find(nwid_i);
if (found == _networks.end()) {
continue; // skip members trying to join non-existant networks
}
std::string networkId(nwidTmp);
std::string memberId(memTmp);
int64_t ts = i->second.first;
std::string ipAddr = i->second.second.toIpString(ipTmp);
std::string timestamp = std::to_string(ts);
std::unordered_map<std::string, std::string> record = {
{"id", memberId},
{"address", ipAddr},
{"last_updated", std::to_string(ts)}
};
tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts)
.zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts)
.sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
.hmset("network:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
}
tx.exec();
// expire records from all-nodes and network-nodes member list
uint64_t expireOld = OSUtils::now() - 300000;
auto cursor = 0LL;
std::unordered_set<std::string> keys;
// can't scan for keys in a transaction, so we need to fall back to _cluster or _redis
// to get all network-members keys
if(_rc->clusterMode) {
auto r = _cluster->redis(controllerId);
while(true) {
cursor = r.scan(cursor, "network-nodes-online:{"+controllerId+"}:*", INT_MAX, std::inserter(keys, keys.begin()));
if (cursor == 0) {
break;
}
}
} else {
while(true) {
cursor = _redis->scan(cursor, "network-nodes-online:"+controllerId+":*", INT_MAX, std::inserter(keys, keys.begin()));
if (cursor == 0) {
break;
}
}
}
tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
for(const auto &k : keys) {
tx.zremrangebyscore(k, sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
}
tx.exec();
}
PGconn *PostgreSQL::getPgConn(OverrideMode m)
{
if (m == ALLOW_PGBOUNCER_OVERRIDE) {

View file

@ -22,7 +22,7 @@ namespace ZeroTier {
bool NetworkConfig::toDictionary(Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> &d,bool includeLegacy) const
{
Buffer<ZT_NETWORKCONFIG_DICT_CAPACITY> *tmp = new Buffer<ZT_NETWORKCONFIG_DICT_CAPACITY>();
char tmp2[128];
char tmp2[128] = {0};
try {
d.clear();
@ -84,7 +84,7 @@ bool NetworkConfig::toDictionary(Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> &d,b
if (((int)lastrt < 32)||(lastrt == ZT_NETWORK_RULE_MATCH_ETHERTYPE)) {
if (ets.length() > 0)
ets.push_back(',');
char tmp2[16];
char tmp2[16] = {0};
ets.append(Utils::hex((uint16_t)et,tmp2));
}
et = 0;
@ -104,7 +104,7 @@ bool NetworkConfig::toDictionary(Dictionary<ZT_NETWORKCONFIG_DICT_CAPACITY> &d,b
if ((this->specialists[i] & ZT_NETWORKCONFIG_SPECIALIST_TYPE_ACTIVE_BRIDGE) != 0) {
if (ab.length() > 0)
ab.push_back(',');
char tmp2[16];
char tmp2[16] = {0};
ab.append(Address(this->specialists[i]).toString(tmp2));
}
}
@ -220,7 +220,7 @@ bool NetworkConfig::fromDictionary(const Dictionary<ZT_NETWORKCONFIG_DICT_CAPACI
if (d.getUI(ZT_NETWORKCONFIG_DICT_KEY_VERSION,0) < 6) {
#ifdef ZT_SUPPORT_OLD_STYLE_NETCONF
char tmp2[1024];
char tmp2[1024] = {0};
// Decode legacy fields if version is old
if (d.getB(ZT_NETWORKCONFIG_DICT_KEY_ENABLE_BROADCAST_OLD))

View file

@ -226,9 +226,16 @@ public:
capabilityCount(0),
tagCount(0),
certificateOfOwnershipCount(0),
capabilities(),
tags(),
certificatesOfOwnership(),
type(ZT_NETWORK_TYPE_PRIVATE)
{
name[0] = 0;
memset(specialists, 0, sizeof(uint64_t)*ZT_MAX_NETWORK_SPECIALISTS);
memset(routes, 0, sizeof(ZT_VirtualNetworkRoute)*ZT_MAX_NETWORK_ROUTES);
memset(staticIps, 0, sizeof(InetAddress)*ZT_MAX_ZT_ASSIGNED_ADDRESSES);
memset(rules, 0, sizeof(ZT_VirtualNetworkRule)*ZT_MAX_NETWORK_RULES);
}
/**