diff --git a/node/Bond.cpp b/node/Bond.cpp index e0f56e903..d633d7c21 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -40,6 +40,9 @@ std::map > > Bond::_interface bool Bond::linkAllowed(std::string& policyAlias, SharedPtr link) { + if (! link) { + return false; + } bool foundInDefinitions = false; if (_linkDefinitions.count(policyAlias)) { auto it = _linkDefinitions[policyAlias].begin(); @@ -152,7 +155,7 @@ void Bond::destroyBond(uint64_t peerId) _bonds.erase(peerId); } -SharedPtr Bond::getLinkBySocket(const std::string& policyAlias, uint64_t localSocket) +SharedPtr Bond::getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded = false) { Mutex::Lock _l(_links_m); char ifname[64] = { 0 }; @@ -160,10 +163,14 @@ SharedPtr Bond::getLinkBySocket(const std::string& policyAlias, uint64_t l std::string ifnameStr(ifname); auto search = _interfaceToLinkMap[policyAlias].find(ifnameStr); if (search == _interfaceToLinkMap[policyAlias].end()) { - // If the link wasn't already known, add a new entry - SharedPtr s = new Link(ifnameStr, 0, 0, true, ZT_BOND_SLAVE_MODE_SPARE, "", 0.0); - _interfaceToLinkMap[policyAlias].insert(std::pair >(ifnameStr, s)); - return s; + if (createIfNeeded) { + SharedPtr s = new Link(ifnameStr, 0, 0, true, ZT_BOND_SLAVE_MODE_SPARE, "", 0.0); + _interfaceToLinkMap[policyAlias].insert(std::pair >(ifnameStr, s)); + return s; + } + else { + return SharedPtr(); + } } else { return search->second; @@ -225,10 +232,12 @@ Bond::Bond(const RuntimeEnvironment* renv, SharedPtr originalBond, const S void Bond::nominatePathToBond(const SharedPtr& path, int64_t now) { Mutex::Lock _l(_paths_m); + debug("attempting to nominate link %s", pathToStr(path).c_str()); /** * Ensure the link is allowed and the path is not already present */ if (! RR->bc->linkAllowed(_policyAlias, getLink(path))) { + debug("link %s is not permitted according to user-specified rules", pathToStr(path).c_str()); return; } bool alreadyPresent = false; @@ -236,6 +245,7 @@ void Bond::nominatePathToBond(const SharedPtr& path, int64_t now) // Sanity check if (path.ptr() == _paths[i].p.ptr()) { alreadyPresent = true; + debug("link %s already exists", pathToStr(path).c_str()); break; } } @@ -254,20 +264,22 @@ void Bond::nominatePathToBond(const SharedPtr& path, int64_t now) // Determine if there are any other paths on this link bool bFoundCommonLink = false; SharedPtr commonLink = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - for (unsigned int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; ++j) { - if (_paths[j].p && _paths[j].p.ptr() != _paths[i].p.ptr()) { - if (RR->bc->getLinkBySocket(_policyAlias, _paths[j].p->localSocket()) == commonLink) { - bFoundCommonLink = true; - _paths[j].onlyPathOnLink = false; + if (commonLink) { + for (unsigned int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; ++j) { + if (_paths[j].p && _paths[j].p.ptr() != _paths[i].p.ptr()) { + if (RR->bc->getLinkBySocket(_policyAlias, _paths[j].p->localSocket()) == commonLink, true) { + bFoundCommonLink = true; + _paths[j].onlyPathOnLink = false; + } } } + _paths[i].ipvPref = sl->ipvPref(); + _paths[i].mode = sl->mode(); + _paths[i].enabled = sl->enabled(); + _paths[i].onlyPathOnLink = ! bFoundCommonLink; } - _paths[i].ipvPref = sl->ipvPref(); - _paths[i].mode = sl->mode(); - _paths[i].enabled = sl->enabled(); - _paths[i].onlyPathOnLink = ! bFoundCommonLink; } - log("nominate link %s", pathToStr(path).c_str()); + log("nominated link %s", pathToStr(path).c_str()); break; } } @@ -553,7 +565,6 @@ bool Bond::assignFlowToBondedPath(SharedPtr& flow, int64_t now) } flow->assignPath(_abPathIdx, now); } - SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[flow->assignedPath].p->localSocket()); debug("assign out-flow %04x to link %s (%lu / %lu flows)", flow->id, pathToStr(_paths[flow->assignedPath].p).c_str(), _paths[flow->assignedPath].assignedFlowCount, (unsigned long)_flows.size()); return true; } @@ -640,22 +651,24 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr& return; } SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[pathIdx].p->localSocket()); - if (remoteUtility > _localUtility) { - _paths[pathIdx].p->address().toString(pathStr); - debug("peer suggests alternate link %s/%s, remote utility (%d) greater than local utility (%d), switching to suggested link\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); - _negotiatedPathIdx = pathIdx; - } - if (remoteUtility < _localUtility) { - debug("peer suggests alternate link %s/%s, remote utility (%d) less than local utility (%d), not switching\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); - } - if (remoteUtility == _localUtility) { - debug("peer suggests alternate link %s/%s, remote utility (%d) equal to local utility (%d)\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); - if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) { - debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); + if (link) { + if (remoteUtility > _localUtility) { + _paths[pathIdx].p->address().toString(pathStr); + debug("peer suggests alternate link %s/%s, remote utility (%d) greater than local utility (%d), switching to suggested link\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); _negotiatedPathIdx = pathIdx; } - else { - debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); + if (remoteUtility < _localUtility) { + debug("peer suggests alternate link %s/%s, remote utility (%d) less than local utility (%d), not switching\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); + } + if (remoteUtility == _localUtility) { + debug("peer suggests alternate link %s/%s, remote utility (%d) equal to local utility (%d)\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); + if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) { + debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); + _negotiatedPathIdx = pathIdx; + } + else { + debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); + } } } } @@ -827,10 +840,17 @@ void Bond::curateBond(int64_t now, bool rebuildBond) } /** - * Remove expired links from bond + * Remove expired or invalid links from bond */ + SharedPtr link = getLink(_paths[i].p); + if (! link) { + log("link is no longer valid, removing from bond"); + _paths[i] = NominatedPath(); + _paths[i].p = SharedPtr(); + continue; + } if ((now - _paths[i].p->_lastIn) > (ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD)) { - log("link %s has expired, removing from bond", pathToStr(_paths[i].p).c_str()); + log("link (%s) has expired or is invalid, removing from bond", pathToStr(_paths[i].p).c_str()); _paths[i] = NominatedPath(); _paths[i].p = SharedPtr(); continue; @@ -920,7 +940,9 @@ void Bond::curateBond(int64_t now, bool rebuildBond) for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { if (_paths[i].p) { SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - linkMap[link].push_back(i); + if (link) { + linkMap[link].push_back(i); + } } } // Re-form bond from link<->path map @@ -1007,13 +1029,17 @@ void Bond::estimatePathQuality(int64_t now) for (unsigned int i = 0; i < _numBondedPaths; ++i) { if (_paths[i].p && _paths[i].allowed()) { SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - totUserSpecifiedLinkSpeed += link->speed(); + if (link) { + totUserSpecifiedLinkSpeed += link->speed(); + } } } for (unsigned int i = 0; i < _numBondedPaths; ++i) { if (_paths[i].p && _paths[i].allowed()) { SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - link->setRelativeSpeed((uint8_t)round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255)); + if (link) { + link->setRelativeSpeed((uint8_t)round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255)); + } } } } @@ -1283,21 +1309,23 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) continue; } SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - if (_paths[i].eligible && link->primary()) { - if (! _paths[i].preferred()) { - // Found path on primary link, take note in case we don't find a preferred path - nonPreferredPathIdx = i; - bFoundPrimaryLink = true; - } - if (_paths[i].preferred()) { - _abPathIdx = i; - bFoundPrimaryLink = true; - if (_paths[_abPathIdx].p) { - SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[_abPathIdx].p->localSocket()); - if (link) { - log("found preferred primary link %s", pathToStr(_paths[_abPathIdx].p).c_str()); + if (link) { + if (_paths[i].eligible && link->primary()) { + if (! _paths[i].preferred()) { + // Found path on primary link, take note in case we don't find a preferred path + nonPreferredPathIdx = i; + bFoundPrimaryLink = true; + } + if (_paths[i].preferred()) { + _abPathIdx = i; + bFoundPrimaryLink = true; + if (_paths[_abPathIdx].p) { + SharedPtr abLink = RR->bc->getLinkBySocket(_policyAlias, _paths[_abPathIdx].p->localSocket()); + if (abLink) { + log("found preferred primary link %s", pathToStr(_paths[_abPathIdx].p).c_str()); + } + break; // Found preferred path on primary link } - break; // Found preferred path on primary link } } } @@ -1307,13 +1335,12 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) _abPathIdx = nonPreferredPathIdx; } if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) { - log("user-designated primary link is not yet ready"); + log("user-designated primary link is not available"); // TODO: Should wait for some time (failover interval?) and then switch to spare link } } else if (! userHasSpecifiedPrimaryLink()) { - log("user did not specify a primary link, select first available link"); for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { if (_paths[i].p && _paths[i].eligible) { _abPathIdx = i; @@ -1369,7 +1396,9 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) continue; } SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - + if (! link) { + continue; + } int failoverScoreHandicap = _paths[i].failoverScore; if (_paths[i].preferred()) { failoverScoreHandicap += ZT_BOND_FAILOVER_HANDICAP_PREFERRED; @@ -1432,7 +1461,11 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) if (! _paths[i].eligible) { failoverScoreHandicap = -10000; } - if (getLink(_paths[i].p)->primary() && _abLinkSelectMethod != ZT_BOND_RESELECTION_POLICY_OPTIMIZE) { + SharedPtr link = getLink(_paths[i].p); + if (! link) { + continue; + } + if (link->primary() && _abLinkSelectMethod != ZT_BOND_RESELECTION_POLICY_OPTIMIZE) { // If using "optimize" primary re-select mode, ignore user link designations failoverScoreHandicap = ZT_BOND_FAILOVER_HANDICAP_PRIMARY; } @@ -1501,15 +1534,19 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) _lastActiveBackupPathChange = now; } if (_abLinkSelectMethod == ZT_BOND_RESELECTION_POLICY_ALWAYS) { - if (! getLink(_paths[_abPathIdx].p)->primary() && _paths[_abFailoverQueue.front()].p && getLink(_paths[_abFailoverQueue.front()].p)->primary()) { + SharedPtr abLink = getLink(_paths[_abPathIdx].p); + SharedPtr abFailoverLink = getLink(_paths[_abFailoverQueue.front()].p); + if (abLink && ! abLink->primary() && _paths[_abFailoverQueue.front()].p && abFailoverLink && abFailoverLink->primary()) { dequeueNextActiveBackupPath(now); log("switch back to available primary link %s (select mode: always)", pathToStr(_paths[_abPathIdx].p).c_str()); } } if (_abLinkSelectMethod == ZT_BOND_RESELECTION_POLICY_BETTER) { - if (! getLink(_paths[_abPathIdx].p)->primary()) { + SharedPtr abLink = getLink(_paths[_abPathIdx].p); + if (abLink && ! abLink->primary()) { // Active backup has switched to "better" primary link according to re-select policy. - if (_paths[_abFailoverQueue.front()].p && getLink(_paths[_abFailoverQueue.front()].p)->primary() && (_paths[_abFailoverQueue.front()].failoverScore > _paths[_abPathIdx].failoverScore)) { + SharedPtr abFailoverLink = getLink(_paths[_abFailoverQueue.front()].p); + if (_paths[_abFailoverQueue.front()].p && abFailoverLink && abFailoverLink->primary() && (_paths[_abFailoverQueue.front()].failoverScore > _paths[_abPathIdx].failoverScore)) { dequeueNextActiveBackupPath(now); log("switch back to user-defined primary link %s (select mode: better)", pathToStr(_paths[_abPathIdx].p).c_str()); } @@ -1709,8 +1746,12 @@ std::string Bond::pathToStr(const SharedPtr& path) char pathStr[64] = { 0 }; char fullPathStr[384] = { 0 }; path->address().toString(pathStr); - snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr); - return std::string(fullPathStr); + SharedPtr link = getLink(path); + if (link) { + std::string ifnameStr = std::string(link->ifname()); + snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), ifnameStr.c_str(), pathStr); + return std::string(fullPathStr); + } } return ""; #else @@ -1724,11 +1765,11 @@ void Bond::dumpPathStatus(int64_t now, int pathIdx) std::string aliveOrDead = _paths[pathIdx].alive ? std::string("alive") : std::string("dead"); std::string eligibleOrNot = _paths[pathIdx].eligible ? std::string("eligible") : std::string("ineligible"); std::string bondedOrNot = _paths[pathIdx].bonded ? std::string("bonded") : std::string("unbonded"); - log("path[%2d] --- %5s (in %7lld, out: %7lld), %10s, %8s, flows=%-6d lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3d --- (%s)", + log("path[%2u] --- %5s (in %7lld, out: %7lld), %10s, %8s, flows=%-6u lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3u --- (%s)", pathIdx, aliveOrDead.c_str(), static_cast(_paths[pathIdx].p->age(now)), - static_cast(now - _paths[pathIdx].p->_lastOut), + static_cast(_paths[pathIdx].p->_lastOut == 0 ? 0 : now - _paths[pathIdx].p->_lastOut), eligibleOrNot.c_str(), bondedOrNot.c_str(), _paths[pathIdx].assignedFlowCount, diff --git a/node/Bond.hpp b/node/Bond.hpp index d58df4489..a285381f2 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -458,9 +458,10 @@ class Bond { * * @param policyAlias Policy in use * @param localSocket Local source socket + * @param createIfNeeded Whether a Link object is created if the name wasn't previously in the link map * @return Physical link definition */ - static SharedPtr getLinkBySocket(const std::string& policyAlias, uint64_t localSocket); + static SharedPtr getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded); /** * Gets a reference to a physical link definition given its human-readable system name. @@ -1141,10 +1142,10 @@ class Bond { * */ void log(const char* fmt, ...) + { #ifdef __GNUC__ __attribute__((format(printf, 2, 3))) #endif - { #ifdef ZT_TRACE time_t rawtime; struct tm* timeinfo; @@ -1173,10 +1174,10 @@ class Bond { * */ void debug(const char* fmt, ...) + { #ifdef __GNUC__ __attribute__((format(printf, 2, 3))) #endif - { #ifdef ZT_DEBUG time_t rawtime; struct tm* timeinfo; diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index c2422f7c3..28aad4bef 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -229,23 +229,33 @@ public: * @param s Socket object * @return Underlying OS-type (usually int or long) file descriptor associated with object */ - static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket *s) throw() { return reinterpret_cast(s)->sock; } + static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket* s) throw() + { + return reinterpret_cast(s)->sock; + } /** * @param s Socket object * @return Pointer to user object */ - static inline void** getuptr(PhySocket *s) throw() { return &(reinterpret_cast(s)->uptr); } + static inline void** getuptr(PhySocket* s) throw() + { + return &(reinterpret_cast(s)->uptr); + } /** * @param s Socket object * @param nameBuf Buffer to store name of interface which this Socket object is bound to * @param buflen Length of buffer to copy name into */ - static inline void getIfName(PhySocket *s, char *nameBuf, int buflen) + static inline void getIfName(PhySocket* s, char* nameBuf, int buflen) { + PhySocketImpl& sws = *(reinterpret_cast(s)); + if (sws.type == ZT_PHY_SOCKET_CLOSED) { + return; + } if (s) { - memcpy(nameBuf, reinterpret_cast(s)->ifname, buflen); + memcpy(nameBuf, reinterpret_cast(s)->ifname, buflen); } } @@ -254,10 +264,14 @@ public: * @param ifname Buffer containing name of interface that this Socket object is bound to * @param len Length of name of interface */ - static inline void setIfName(PhySocket *s, char *ifname, int len) + static inline void setIfName(PhySocket* s, char* ifname, int len) { + PhySocketImpl& sws = *(reinterpret_cast(s)); + if (sws.type == ZT_PHY_SOCKET_CLOSED) { + return; + } if (s) { - memcpy(&(reinterpret_cast(s)->ifname), ifname, len); + memcpy(&(reinterpret_cast(s)->ifname), ifname, len); } } @@ -270,21 +284,27 @@ public: inline void whack() { #if defined(_WIN32) || defined(_WIN64) - ::send(_whackSendSocket,(const char *)this,1,0); + ::send(_whackSendSocket, (const char*)this, 1, 0); #else - (void)(::write(_whackSendSocket,(PhySocket *)this,1)); + (void)(::write(_whackSendSocket, (PhySocket*)this, 1)); #endif } /** * @return Number of open sockets */ - inline unsigned long count() const throw() { return _socks.size(); } + inline unsigned long count() const throw() + { + return _socks.size(); + } /** * @return Maximum number of sockets allowed */ - inline unsigned long maxCount() const throw() { return ZT_PHY_MAX_SOCKETS; } + inline unsigned long maxCount() const throw() + { + return ZT_PHY_MAX_SOCKETS; + } /** * Wrap a raw file descriptor in a PhySocket structure