diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 4dd63a0a7..718fd20d4 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -273,18 +273,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn) std::string setKey = "networks:{" + _myAddressStr + "}"; - if (_rc != NULL) { - try { - if (_rc->clusterMode) { - _cluster->del(setKey); - } else { - _redis->del(setKey); - } - } catch (sw::redis::Error &e) { - // del can throw an error if the key doesn't exist - // swallow it and move along - } - } + // if (_rc != NULL) { + // try { + // if (_rc->clusterMode) { + // _cluster->del(setKey); + // } else { + // _redis->del(setKey); + // } + // } catch (sw::redis::Error &e) { + // // del can throw an error if the key doesn't exist + // // swallow it and move along + // } + // } std::unordered_set networkSet; @@ -475,17 +475,17 @@ void PostgreSQL::initializeNetworks(PGconn *conn) PQclear(res); - if(!networkSet.empty()) { - if (_rc && _rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - tx.sadd(setKey, networkSet.begin(), networkSet.end()); - tx.exec(); - } else if (_rc && !_rc->clusterMode) { - auto tx = _redis->transaction(true); - tx.sadd(setKey, networkSet.begin(), networkSet.end()); - tx.exec(); - } - } + // if(!networkSet.empty()) { + // if (_rc && _rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // tx.sadd(setKey, networkSet.begin(), networkSet.end()); + // tx.exec(); + // } else if (_rc && !_rc->clusterMode) { + // auto tx = _redis->transaction(true); + // tx.sadd(setKey, networkSet.begin(), networkSet.end()); + // tx.exec(); + // } + // } if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -509,36 +509,36 @@ void PostgreSQL::initializeMembers(PGconn *conn) fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } - std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; + // std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; - if (_rc != NULL) { - std::lock_guard l(_networks_l); - std::unordered_set deletes; - for ( auto it : _networks) { - uint64_t nwid_i = it.first; - char nwidTmp[64] = {0}; - OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - std::string nwid(nwidTmp); - std::string key = setKeyBase + nwid; - deletes.insert(key); - } + // if (_rc != NULL) { + // std::lock_guard l(_networks_l); + // std::unordered_set deletes; + // for ( auto it : _networks) { + // uint64_t nwid_i = it.first; + // char nwidTmp[64] = {0}; + // OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + // std::string nwid(nwidTmp); + // std::string key = setKeyBase + nwid; + // deletes.insert(key); + // } - if (!deletes.empty()) { - if (_rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - for (std::string k : deletes) { - tx.del(k); - } - tx.exec(); - } else { - auto tx = _redis->transaction(true); - for (std::string k : deletes) { - tx.del(k); - } - tx.exec(); - } - } - } + // if (!deletes.empty()) { + // if (_rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // for (std::string k : deletes) { + // tx.del(k); + // } + // tx.exec(); + // } else { + // auto tx = _redis->transaction(true); + // for (std::string k : deletes) { + // tx.del(k); + // } + // tx.exec(); + // } + // } + // } const char *params[1] = { _myAddressStr.c_str() @@ -578,7 +578,7 @@ void PostgreSQL::initializeMembers(PGconn *conn) std::string memberId(PQgetvalue(res, i, 0)); std::string networkId(PQgetvalue(res, i, 1)); - networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); + // networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); std::string ctime = PQgetvalue(res, i, 5); config["id"] = memberId; @@ -685,23 +685,23 @@ void PostgreSQL::initializeMembers(PGconn *conn) PQclear(res); - if (!networkMembers.empty()) { - if (_rc != NULL) { - if (_rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - for (auto it : networkMembers) { - tx.sadd(it.first, it.second); - } - tx.exec(); - } else { - auto tx = _redis->transaction(true); - for (auto it : networkMembers) { - tx.sadd(it.first, it.second); - } - tx.exec(); - } - } - } + // if (!networkMembers.empty()) { + // if (_rc != NULL) { + // if (_rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // for (auto it : networkMembers) { + // tx.sadd(it.first, it.second); + // } + // tx.exec(); + // } else { + // auto tx = _redis->transaction(true); + // for (auto it : networkMembers) { + // tx.sadd(it.first, it.second); + // } + // tx.exec(); + // } + // } + // } if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); @@ -755,7 +755,7 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(ts); std::string host_port = std::to_string(_listenPort); - std::string use_redis = (_rc != NULL) ? "true" : "false"; + std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false"; const char *values[10] = { controllerId, hostname, @@ -788,13 +788,13 @@ void PostgreSQL::heartbeat() } PQclear(res); } - if (_rc != NULL) { - if (_rc->clusterMode) { - _cluster->zadd("controllers", controllerId, ts); - } else { - _redis->zadd("controllers", controllerId, ts); - } - } + // if (_rc != NULL) { + // if (_rc->clusterMode) { + // _cluster->zadd("controllers", controllerId, ts); + // } else { + // _redis->zadd("controllers", controllerId, ts); + // } + // } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } @@ -833,6 +833,7 @@ void PostgreSQL::membersDbWatcher() void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { char buf[11] = {0}; std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + fprintf(stderr, "Listening to member stream: %s\n", cmd.c_str()); PGresult *res = PQexec(conn, cmd.c_str()); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); @@ -874,7 +875,7 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { void PostgreSQL::_membersWatcher_Redis() { char buf[11] = {0}; std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; - + fprintf(stderr, "Listening to member stream: %s\n", key.c_str()); while (_run == 1) { try { json tmp; @@ -1515,20 +1516,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { - _cluster->sadd(key, id); - } else { - _redis->sadd(key, id); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string id = (*config)["id"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "networks:{" + controllerId + "}"; + // if (_rc->clusterMode) { + // _cluster->sadd(key, id); + // } else { + // _redis->sadd(key, id); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + // } + // } } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; @@ -1552,22 +1553,22 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { - _cluster->srem(key, id); - _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); - } else { - _redis->srem(key, id); - _redis->del("network-nodes-online:{"+controllerId+"}:"+id); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string id = (*config)["id"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "networks:{" + controllerId + "}"; + // if (_rc->clusterMode) { + // _cluster->srem(key, id); + // _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); + // } else { + // _redis->srem(key, id); + // _redis->del("network-nodes-online:{"+controllerId+"}:"+id); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + // } + // } } else if (objtype == "_delete_member") { try { std::string memberId = (*config)["id"]; @@ -1595,23 +1596,23 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string memberId = (*config)["id"]; - std::string networkId = (*config)["nwid"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; - if (_rc->clusterMode) { - _cluster->srem(key, memberId); - _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - } else { - _redis->srem(key, memberId); - _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string memberId = (*config)["id"]; + // std::string networkId = (*config)["nwid"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + // if (_rc->clusterMode) { + // _cluster->srem(key, memberId); + // _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + // } else { + // _redis->srem(key, memberId); + // _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + // } + // } } else { fprintf(stderr, "ERROR: unknown objtype"); } @@ -1619,7 +1620,7 @@ void PostgreSQL::commitThread() fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } PQfinish(conn); @@ -1634,11 +1635,11 @@ void PostgreSQL::onlineNotificationThread() { waitForReady(); - if (_rc != NULL) { - onlineNotification_Redis(); - } else { + // if (_rc != NULL) { + // onlineNotification_Redis(); + // } else { onlineNotification_Postgres(); - } + // } } void PostgreSQL::onlineNotification_Postgres() @@ -1783,7 +1784,7 @@ void PostgreSQL::onlineNotification_Redis() fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); #endif } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::seconds(10)); } } diff --git a/node/Bond.hpp b/node/Bond.hpp index abb78f6c4..763089427 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -87,7 +87,9 @@ public: std::string policyAlias() { return _policyAlias; } /** - * Inform the bond about the path that its peer (owning object) just learned about + * Inform the bond about the path that its peer (owning object) just learned about. + * If the path is allowed to be used, it will be inducted into the bond on a trial + * period where link statistics will be collected to judge its quality. * * @param path Newly-learned Path which should now be handled by the Bond * @param now Current time diff --git a/service/OneService.cpp b/service/OneService.cpp index 140a66103..8071e5c77 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -1630,7 +1630,6 @@ public: // Custom Policies json &customBondingPolicies = settings["policies"]; for (json::iterator policyItr = customBondingPolicies.begin(); policyItr != customBondingPolicies.end();++policyItr) { - //fprintf(stderr, "\n\n--- (%s)\n", policyItr.key().c_str()); // Custom Policy std::string customPolicyStr(policyItr.key()); json &customPolicy = policyItr.value(); @@ -1684,7 +1683,6 @@ public: // Policy-Specific link set json &links = customPolicy["links"]; for (json::iterator linkItr = links.begin(); linkItr != links.end();++linkItr) { - //fprintf(stderr, "\t--- link (%s)\n", linkItr.key().c_str()); std::string linkNameStr(linkItr.key()); json &link = linkItr.value(); @@ -1719,12 +1717,19 @@ public: } _node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,speed,linkMonitorInterval,upDelay,downDelay,enabled,linkMode,failoverToStr,alloc)); } - // TODO: This is dumb std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"optimize")); - if (linkSelectMethodStr == "always") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS); } - if (linkSelectMethodStr == "better") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_BETTER); } - if (linkSelectMethodStr == "failure") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_FAILURE); } - if (linkSelectMethodStr == "optimize") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE); } + if (linkSelectMethodStr == "always") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS); + } + if (linkSelectMethodStr == "better") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_BETTER); + } + if (linkSelectMethodStr == "failure") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_FAILURE); + } + if (linkSelectMethodStr == "optimize") { + newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE); + } if (newTemplateBond->getLinkSelectMethod() < 0 || newTemplateBond->getLinkSelectMethod() > 3) { fprintf(stderr, "warning: invalid value (%s) for linkSelectMethod, assuming mode: always\n", linkSelectMethodStr.c_str()); }