From 251b06d812357ee9ea0bd9aa8b7eeb868dad6248 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Thu, 23 Jul 2020 09:38:50 -0700 Subject: [PATCH 1/4] revert redis for member status --- controller/PostgreSQL.cpp | 258 +++++++++++++++++++------------------- 1 file changed, 129 insertions(+), 129 deletions(-) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 0f5fde77b..d89b77814 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -273,18 +273,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn) std::string setKey = "networks:{" + _myAddressStr + "}"; - if (_rc != NULL) { - try { - if (_rc->clusterMode) { - _cluster->del(setKey); - } else { - _redis->del(setKey); - } - } catch (sw::redis::Error &e) { - // del can throw an error if the key doesn't exist - // swallow it and move along - } - } + // if (_rc != NULL) { + // try { + // if (_rc->clusterMode) { + // _cluster->del(setKey); + // } else { + // _redis->del(setKey); + // } + // } catch (sw::redis::Error &e) { + // // del can throw an error if the key doesn't exist + // // swallow it and move along + // } + // } std::unordered_set networkSet; @@ -437,17 +437,17 @@ void PostgreSQL::initializeNetworks(PGconn *conn) PQclear(res); - if(!networkSet.empty()) { - if (_rc && _rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - tx.sadd(setKey, networkSet.begin(), networkSet.end()); - tx.exec(); - } else if (_rc && !_rc->clusterMode) { - auto tx = _redis->transaction(true); - tx.sadd(setKey, networkSet.begin(), networkSet.end()); - tx.exec(); - } - } + // if(!networkSet.empty()) { + // if (_rc && _rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // tx.sadd(setKey, networkSet.begin(), networkSet.end()); + // tx.exec(); + // } else if (_rc && !_rc->clusterMode) { + // auto tx = _redis->transaction(true); + // tx.sadd(setKey, networkSet.begin(), networkSet.end()); + // tx.exec(); + // } + // } if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -471,36 +471,36 @@ void PostgreSQL::initializeMembers(PGconn *conn) fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } - std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; + // std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; - if (_rc != NULL) { - std::lock_guard l(_networks_l); - std::unordered_set deletes; - for ( auto it : _networks) { - uint64_t nwid_i = it.first; - char nwidTmp[64] = {0}; - OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - std::string nwid(nwidTmp); - std::string key = setKeyBase + nwid; - deletes.insert(key); - } + // if (_rc != NULL) { + // std::lock_guard l(_networks_l); + // std::unordered_set deletes; + // for ( auto it : _networks) { + // uint64_t nwid_i = it.first; + // char nwidTmp[64] = {0}; + // OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + // std::string nwid(nwidTmp); + // std::string key = setKeyBase + nwid; + // deletes.insert(key); + // } - if (!deletes.empty()) { - if (_rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - for (std::string k : deletes) { - tx.del(k); - } - tx.exec(); - } else { - auto tx = _redis->transaction(true); - for (std::string k : deletes) { - tx.del(k); - } - tx.exec(); - } - } - } + // if (!deletes.empty()) { + // if (_rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // for (std::string k : deletes) { + // tx.del(k); + // } + // tx.exec(); + // } else { + // auto tx = _redis->transaction(true); + // for (std::string k : deletes) { + // tx.del(k); + // } + // tx.exec(); + // } + // } + // } const char *params[1] = { _myAddressStr.c_str() @@ -540,7 +540,7 @@ void PostgreSQL::initializeMembers(PGconn *conn) std::string memberId(PQgetvalue(res, i, 0)); std::string networkId(PQgetvalue(res, i, 1)); - networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); + // networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); std::string ctime = PQgetvalue(res, i, 5); config["id"] = memberId; @@ -647,23 +647,23 @@ void PostgreSQL::initializeMembers(PGconn *conn) PQclear(res); - if (!networkMembers.empty()) { - if (_rc != NULL) { - if (_rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - for (auto it : networkMembers) { - tx.sadd(it.first, it.second); - } - tx.exec(); - } else { - auto tx = _redis->transaction(true); - for (auto it : networkMembers) { - tx.sadd(it.first, it.second); - } - tx.exec(); - } - } - } + // if (!networkMembers.empty()) { + // if (_rc != NULL) { + // if (_rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // for (auto it : networkMembers) { + // tx.sadd(it.first, it.second); + // } + // tx.exec(); + // } else { + // auto tx = _redis->transaction(true); + // for (auto it : networkMembers) { + // tx.sadd(it.first, it.second); + // } + // tx.exec(); + // } + // } + // } if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); @@ -717,7 +717,7 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(ts); std::string host_port = std::to_string(_listenPort); - std::string use_redis = (_rc != NULL) ? "true" : "false"; + std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false"; const char *values[10] = { controllerId, hostname, @@ -750,13 +750,13 @@ void PostgreSQL::heartbeat() } PQclear(res); } - if (_rc != NULL) { - if (_rc->clusterMode) { - _cluster->zadd("controllers", controllerId, ts); - } else { - _redis->zadd("controllers", controllerId, ts); - } - } + // if (_rc != NULL) { + // if (_rc->clusterMode) { + // _cluster->zadd("controllers", controllerId, ts); + // } else { + // _redis->zadd("controllers", controllerId, ts); + // } + // } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } @@ -1445,20 +1445,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { - _cluster->sadd(key, id); - } else { - _redis->sadd(key, id); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string id = (*config)["id"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "networks:{" + controllerId + "}"; + // if (_rc->clusterMode) { + // _cluster->sadd(key, id); + // } else { + // _redis->sadd(key, id); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + // } + // } } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; @@ -1482,22 +1482,22 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { - _cluster->srem(key, id); - _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); - } else { - _redis->srem(key, id); - _redis->del("network-nodes-online:{"+controllerId+"}:"+id); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string id = (*config)["id"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "networks:{" + controllerId + "}"; + // if (_rc->clusterMode) { + // _cluster->srem(key, id); + // _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); + // } else { + // _redis->srem(key, id); + // _redis->del("network-nodes-online:{"+controllerId+"}:"+id); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + // } + // } } else if (objtype == "_delete_member") { try { std::string memberId = (*config)["id"]; @@ -1525,23 +1525,23 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string memberId = (*config)["id"]; - std::string networkId = (*config)["nwid"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; - if (_rc->clusterMode) { - _cluster->srem(key, memberId); - _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - } else { - _redis->srem(key, memberId); - _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string memberId = (*config)["id"]; + // std::string networkId = (*config)["nwid"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + // if (_rc->clusterMode) { + // _cluster->srem(key, memberId); + // _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + // } else { + // _redis->srem(key, memberId); + // _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + // } + // } } else { fprintf(stderr, "ERROR: unknown objtype"); } @@ -1549,7 +1549,7 @@ void PostgreSQL::commitThread() fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } PQfinish(conn); @@ -1564,11 +1564,11 @@ void PostgreSQL::onlineNotificationThread() { waitForReady(); - if (_rc != NULL) { - onlineNotification_Redis(); - } else { + // if (_rc != NULL) { + // onlineNotification_Redis(); + // } else { onlineNotification_Postgres(); - } + // } } void PostgreSQL::onlineNotification_Postgres() From 5b700fa4973029fb877b1c80b070f74e085ebd0c Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Mon, 27 Jul 2020 18:37:45 -0700 Subject: [PATCH 2/4] println for which notification stream the controller is listening to --- controller/PostgreSQL.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index d89b77814..0ea404559 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -795,6 +795,7 @@ void PostgreSQL::membersDbWatcher() void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { char buf[11] = {0}; std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + fprintf(stderr, "Listening to member stream: %s\n", cmd.c_str()); PGresult *res = PQexec(conn, cmd.c_str()); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); @@ -836,7 +837,7 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { void PostgreSQL::_membersWatcher_Redis() { char buf[11] = {0}; std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; - + fprintf(stderr, "Listening to member stream: %s\n", key.c_str()); while (_run == 1) { try { json tmp; From 7f99c4a7793b8c0621b12f0f1ad57d62b0b3dbc9 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Fri, 21 Aug 2020 16:22:28 -0700 Subject: [PATCH 3/4] Sleep 10 seconds between writes to DB --- controller/PostgreSQL.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 0ea404559..1e25abbcd 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -1714,7 +1714,7 @@ void PostgreSQL::onlineNotification_Redis() fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); #endif } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::seconds(10)); } } From b1ddba0438bc3b33ebc0e28ac5c015fa63be1430 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Mon, 24 Aug 2020 18:56:49 -0700 Subject: [PATCH 4/4] Remove a few old comments --- node/Bond.hpp | 4 +++- service/OneService.cpp | 19 ++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/node/Bond.hpp b/node/Bond.hpp index c8a89b616..b655f47ae 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -87,7 +87,9 @@ public: std::string policyAlias() { return _policyAlias; } /** - * Inform the bond about the path that its peer (owning object) just learned about + * Inform the bond about the path that its peer (owning object) just learned about. + * If the path is allowed to be used, it will be inducted into the bond on a trial + * period where link statistics will be collected to judge its quality. * * @param path Newly-learned Path which should now be handled by the Bond * @param now Current time diff --git a/service/OneService.cpp b/service/OneService.cpp index 076dbb59d..1c5dbc3f7 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -1630,7 +1630,6 @@ public: // Custom Policies json &customBondingPolicies = settings["policies"]; for (json::iterator policyItr = customBondingPolicies.begin(); policyItr != customBondingPolicies.end();++policyItr) { - //fprintf(stderr, "\n\n--- (%s)\n", policyItr.key().c_str()); // Custom Policy std::string customPolicyStr(policyItr.key()); json &customPolicy = policyItr.value(); @@ -1684,7 +1683,6 @@ public: // Policy-Specific link set json &links = customPolicy["links"]; for (json::iterator linkItr = links.begin(); linkItr != links.end();++linkItr) { - //fprintf(stderr, "\t--- link (%s)\n", linkItr.key().c_str()); std::string linkNameStr(linkItr.key()); json &link = linkItr.value(); @@ -1719,12 +1717,19 @@ public: } _node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,speed,linkMonitorInterval,upDelay,downDelay,enabled,linkMode,failoverToStr,alloc)); } - // TODO: This is dumb std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"optimize")); - if (linkSelectMethodStr == "always") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS); } - if (linkSelectMethodStr == "better") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_BETTER); } - if (linkSelectMethodStr == "failure") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_FAILURE); } - if (linkSelectMethodStr == "optimize") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE); } + if (linkSelectMethodStr == "always") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS); + } + if (linkSelectMethodStr == "better") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_BETTER); + } + if (linkSelectMethodStr == "failure") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_FAILURE); + } + if (linkSelectMethodStr == "optimize") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE); + } if (newTemplateBond->getLinkSelectMethod() < 0 || newTemplateBond->getLinkSelectMethod() > 3) { fprintf(stderr, "warning: invalid value (%s) for linkSelectMethod, assuming mode: always\n", linkSelectMethodStr.c_str()); }