diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index a9c28badd..b8b23825f 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -1328,9 +1328,9 @@ typedef struct uint8_t scope; /** - * Percentage of traffic allocated to this path (0-255) + * Relative quality value */ - uint8_t allocation; + float relativeQuality; /** * Name of physical interface this path resides on @@ -1355,7 +1355,7 @@ typedef struct uint8_t eligible; /** - * The speed of this link (as given to bonding layer) + * The capacity of this link (as given to bonding layer) */ uint32_t linkSpeed; diff --git a/node/Bond.cpp b/node/Bond.cpp index e7c9e164d..5ab164bb5 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -140,12 +140,13 @@ SharedPtr Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr if (it->second->isUserSpecified() && it->second->userHasSpecifiedFailoverInstructions()) { bond->_userHasSpecifiedFailoverInstructions = true; } - if (it->second->isUserSpecified() && (it->second->speed() > 0)) { - bond->_userHasSpecifiedLinkSpeeds = true; + if (it->second->isUserSpecified() && (it->second->capacity() > 0)) { + bond->_userHasSpecifiedLinkCapacities = true; } ++it; } } + bond->startBond(); return bond; } return SharedPtr(); @@ -154,9 +155,25 @@ SharedPtr Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr void Bond::destroyBond(uint64_t peerId) { Mutex::Lock _l(_bonds_m); + auto iter = _bonds.find(peerId); + if (iter != _bonds.end()) { + iter->second->stopBond(); + } _bonds.erase(peerId); } +void Bond::stopBond() +{ + debug("stopping bond"); + _run = false; +} + +void Bond::startBond() +{ + debug("starting bond"); + _run = true; +} + SharedPtr Bond::getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded = false) { Mutex::Lock _l(_links_m); @@ -239,7 +256,7 @@ void Bond::nominatePathToBond(const SharedPtr& path, int64_t now) * Ensure the link is allowed and the path is not already present */ if (! RR->bc->linkAllowed(_policyAlias, getLinkBySocket(_policyAlias, path->localSocket(), true))) { - debug("link %s is not permitted according to user-specified rules", pathToStr(path).c_str()); + debug("link %s is not allowed according to user-specified rules", pathToStr(path).c_str()); return; } bool alreadyPresent = false; @@ -299,7 +316,7 @@ void Bond::nominatePathToBond(const SharedPtr& path, int64_t now) void Bond::addPathToBond(int nominatedIdx, int bondedIdx) { // Map bonded set to nominated set - _bondIdxMap[bondedIdx] = nominatedIdx; + _realIdxMap[bondedIdx] = nominatedIdx; // Tell the bonding layer that we can now use this path for traffic _paths[nominatedIdx].bonded = true; } @@ -328,62 +345,57 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) * balance-rr */ if (_policy == ZT_BOND_POLICY_BALANCE_RR) { - if (! _allowFlowHashing) { - if (_packetsPerLink == 0) { - // Randomly select a path - return _paths[_bondIdxMap[_freeRandomByte % _numBondedPaths]].p; - } - if (_rrPacketsSentOnCurrLink < _packetsPerLink) { - // Continue to use this link - ++_rrPacketsSentOnCurrLink; - return _paths[_bondIdxMap[_rrIdx]].p; - } - // Reset striping counter - _rrPacketsSentOnCurrLink = 0; - if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) { - _rrIdx = 0; - } - else { - int _tempIdx = _rrIdx; - for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) { - _tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1; - if (_bondIdxMap[_tempIdx] != ZT_MAX_PEER_NETWORK_PATHS) { - if (_paths[_bondIdxMap[_tempIdx]].p && _paths[_bondIdxMap[_tempIdx]].eligible) { - _rrIdx = _tempIdx; - break; - } + if (_packetsPerLink == 0) { + // Randomly select a path + return _paths[_realIdxMap[_freeRandomByte % _numBondedPaths]].p; + } + if (_rrPacketsSentOnCurrLink < _packetsPerLink) { + // Continue to use this link + ++_rrPacketsSentOnCurrLink; + return _paths[_realIdxMap[_rrIdx]].p; + } + // Reset striping counter + _rrPacketsSentOnCurrLink = 0; + if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) { + _rrIdx = 0; + } + else { + int _tempIdx = _rrIdx; + for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) { + _tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1; + if (_realIdxMap[_tempIdx] != ZT_MAX_PEER_NETWORK_PATHS) { + if (_paths[_realIdxMap[_tempIdx]].p && _paths[_realIdxMap[_tempIdx]].eligible) { + _rrIdx = _tempIdx; + break; } } } - if (_paths[_bondIdxMap[_rrIdx]].p) { - return _paths[_bondIdxMap[_rrIdx]].p; - } + } + if (_paths[_realIdxMap[_rrIdx]].p) { + return _paths[_realIdxMap[_rrIdx]].p; } } /** - * balance-xor + * balance-xor/aware */ if (_policy == ZT_BOND_POLICY_BALANCE_XOR || _policy == ZT_BOND_POLICY_BALANCE_AWARE) { - if (! _allowFlowHashing || flowId == -1) { + if (flowId == -1) { // No specific path required for unclassified traffic, send on anything - int m_idx = _bondIdxMap[_freeRandomByte % _numBondedPaths]; + int m_idx = _realIdxMap[_freeRandomByte % _numBondedPaths]; return _paths[m_idx].p; } - else if (_allowFlowHashing) { - Mutex::Lock _l(_flows_m); - SharedPtr flow; - if (_flows.count(flowId)) { - flow = _flows[flowId]; - flow->lastActivity = now; - } - else { - unsigned char entropy; - Utils::getSecureRandom(&entropy, 1); - flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now); - } - if (flow) { - return _paths[flow->assignedPath].p; - } + Mutex::Lock _l(_flows_m); + std::map >::iterator it = _flows.find(flowId); + if (likely(it != _flows.end())) { + it->second->lastActivity = now; + return _paths[it->second->assignedPath].p; + } + else { + unsigned char entropy; + Utils::getSecureRandom(&entropy, 1); + SharedPtr flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now); + _flows[flowId] = flow; + return _paths[flow->assignedPath].p; } } return SharedPtr(); @@ -423,7 +435,7 @@ void Bond::recordOutgoingPacket(const SharedPtr& path, uint64_t packetId, } } } - if (_allowFlowHashing && (flowId != ZT_QOS_NO_FLOW)) { + if (flowId != ZT_QOS_NO_FLOW) { Mutex::Lock _l(_flows_m); if (_flows.count(flowId)) { _flows[flowId]->bytesOut += payloadLength; @@ -458,7 +470,7 @@ void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, //_paths[pathIdx].packetValiditySamples.push(true); } else { - debug("QoS buffer full, will not record information"); + // debug("QoS buffer full, will not record information"); } /* if (_paths[pathIdx].ackStatsIn.size() < ZT_ACK_MAX_PENDING_RECORDS) { @@ -502,13 +514,16 @@ void Bond::receivedQoS(const SharedPtr& path, int64_t now, int count, uint return; } _paths[pathIdx].lastQoSReceived = now; - debug("received QoS packet (sampling %d frames) via %s", count, pathToStr(path).c_str()); - // Look up egress times and compute latency values for each record + // debug("received QoS packet (sampling %d frames) via %s", count, pathToStr(path).c_str()); + // Look up egress times and compute latency values for each record std::map::iterator it; for (int j = 0; j < count; j++) { it = _paths[pathIdx].qosStatsOut.find(rx_id[j]); if (it != _paths[pathIdx].qosStatsOut.end()) { _paths[pathIdx].latencySamples.push(((uint16_t)(now - it->second) - rx_ts[j]) / 2); + // if (_paths[pathIdx].shouldAvoid) { + // debug("RX sample on avoided path %d", pathIdx); + // } _paths[pathIdx].qosStatsOut.erase(it); } } @@ -531,7 +546,7 @@ int32_t Bond::generateQoSPacket(int pathIdx, int64_t now, char* qosBuffer) std::map::iterator it = _paths[pathIdx].qosStatsIn.begin(); int i = 0; int numRecords = std::min(_paths[pathIdx].packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE); - debug("numRecords=%3d, packetsReceivedSinceLastQoS=%3d, _paths[pathIdx].qosStatsIn.size()=%3lu", numRecords, _paths[pathIdx].packetsReceivedSinceLastQoS, _paths[pathIdx].qosStatsIn.size()); + // debug("numRecords=%3d, packetsReceivedSinceLastQoS=%3d, _paths[pathIdx].qosStatsIn.size()=%3lu", numRecords, _paths[pathIdx].packetsReceivedSinceLastQoS, _paths[pathIdx].qosStatsIn.size()); while (i < numRecords && it != _paths[pathIdx].qosStatsIn.end()) { uint64_t id = it->first; memcpy(qosBuffer, &id, sizeof(uint64_t)); @@ -546,72 +561,93 @@ int32_t Bond::generateQoSPacket(int pathIdx, int64_t now, char* qosBuffer) return len; } -bool Bond::assignFlowToBondedPath(SharedPtr& flow, int64_t now) +bool Bond::assignFlowToBondedPath(SharedPtr& flow, int64_t now, bool reassign = false) { if (! _numBondedPaths) { - debug("unable to assign flow %x (bond has no links)\n", flow->id); + debug("unable to assign flow %x (bond has no links)", flow->id); return false; } - unsigned int idx = ZT_MAX_PEER_NETWORK_PATHS; + unsigned int bondedIdx = ZT_MAX_PEER_NETWORK_PATHS; if (_policy == ZT_BOND_POLICY_BALANCE_XOR) { - idx = abs((int)(flow->id % (_numBondedPaths))); - flow->assignPath(_bondIdxMap[idx], now); - ++(_paths[_bondIdxMap[idx]].assignedFlowCount); + bondedIdx = abs((int)(flow->id % _numBondedPaths)); + flow->assignPath(_realIdxMap[bondedIdx], now); + ++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount); } if (_policy == ZT_BOND_POLICY_BALANCE_AWARE) { - unsigned char entropy; - Utils::getSecureRandom(&entropy, 1); - if (_totalBondUnderload) { - entropy %= _totalBondUnderload; - } - /* Since there may be scenarios where a path is removed before we can re-estimate - relative qualities (and thus allocations) we need to down-modulate the entropy - value that we use to randomly assign among the surviving paths, otherwise we risk - not being able to find a path to assign this flow to. */ - int totalIncompleteAllocation = 0; - for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { - if (_paths[i].p && _paths[i].bonded) { - totalIncompleteAllocation += _paths[i].allocation; - } - } - entropy %= totalIncompleteAllocation; - for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { - if (_paths[i].p && _paths[i].bonded) { - uint8_t probabilitySegment = (_totalBondUnderload > 0) ? _paths[i].affinity : _paths[i].allocation; - if (entropy <= probabilitySegment) { - idx = i; - break; - } - entropy -= probabilitySegment; - } - } - if (idx < ZT_MAX_PEER_NETWORK_PATHS) { - flow->assignPath(idx, now); - ++(_paths[idx].assignedFlowCount); + /** balance-aware generally works like balance-xor except that it will try to + * take into account user preferences (or default sane limits) that will discourage + * allocating traffic to links with a lesser perceived "quality" */ + int offset = 0; + float bestQuality = 0.0; + int nextBestQualIdx = ZT_MAX_PEER_NETWORK_PATHS; + + if (reassign) { + log("attempting to re-assign out-flow %04x previously on idx %d (%u / %lu flows)", flow->id, flow->assignedPath, _paths[_realIdxMap[flow->assignedPath]].assignedFlowCount, _flows.size()); } else { - debug("unable to assign out-flow %x (unknown reason)", flow->id); - return false; + debug("attempting to assign flow for the first time"); + } + + unsigned char entropy; + Utils::getSecureRandom(&entropy, 1); + float randomLinkCapacity = ((float)entropy / 255.0); // Used to random but proportional choices + + while (offset < _numBondedPaths) { + unsigned char entropy; + Utils::getSecureRandom(&entropy, 1); + + if (reassign) { + bondedIdx = (flow->assignedPath + offset) % (_numBondedPaths); + } + else { + bondedIdx = abs((int)((entropy + offset) % (_numBondedPaths))); + } + // debug("idx=%d, offset=%d, randomCap=%f, actualCap=%f", bondedIdx, offset, randomLinkCapacity, _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity); + if (! _paths[_realIdxMap[bondedIdx]].p) { + continue; + } + if (! _paths[_realIdxMap[bondedIdx]].shouldAvoid && randomLinkCapacity <= _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity) { + // debug(" assign out-flow %04x to link %s (%u / %lu flows)", flow->id, pathToStr(_paths[_realIdxMap[bondedIdx]].p).c_str(), _paths[_realIdxMap[bondedIdx]].assignedFlowCount, _flows.size()); + break; // Acceptable -- No violation of quality spec + } + if (_paths[_realIdxMap[bondedIdx]].relativeQuality > bestQuality) { + bestQuality = _paths[_realIdxMap[bondedIdx]].relativeQuality; + nextBestQualIdx = bondedIdx; + // debug(" recording next-best link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx); + } + ++offset; + } + if (offset < _numBondedPaths) { + // We were (able) to find a path that didn't violate any of the user's quality requirements + flow->assignPath(_realIdxMap[bondedIdx], now); + ++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount); + // debug(" ABLE to find optimal link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx); + } + else { + // We were (unable) to find a path that didn't violate at least one quality requirement, will choose next best option + flow->assignPath(_realIdxMap[nextBestQualIdx], now); + ++(_paths[_realIdxMap[nextBestQualIdx]].assignedFlowCount); + // debug(" UNABLE to find, will use link %f idx %d", _paths[_realIdxMap[nextBestQualIdx]].relativeQuality, nextBestQualIdx); } } if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) { if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) { - debug("unable to assign out-flow %x (no active backup link)", flow->id); + log("unable to assign out-flow %x (no active backup link)", flow->id); } flow->assignPath(_abPathIdx, now); } - debug("assign out-flow %04x to link %s (%u / %lu flows)", flow->id, pathToStr(_paths[flow->assignedPath].p).c_str(), _paths[flow->assignedPath].assignedFlowCount, _flows.size()); + log("assign out-flow %04x to link %s (%u / %lu flows)", flow->id, pathToStr(_paths[flow->assignedPath].p).c_str(), _paths[flow->assignedPath].assignedFlowCount, _flows.size()); return true; } SharedPtr Bond::createFlow(int pathIdx, int32_t flowId, unsigned char entropy, int64_t now) { if (! _numBondedPaths) { - debug("unable to assign flow %x (bond has no links)\n", flowId); + debug("unable to assign flow %04x (bond has no links)", flowId); return SharedPtr(); } if (_flows.size() >= ZT_FLOW_MAX_COUNT) { - debug("forget oldest flow (max flows reached: %d)\n", ZT_FLOW_MAX_COUNT); + debug("forget oldest flow (max flows reached: %d)", ZT_FLOW_MAX_COUNT); forgetFlowsWhenNecessary(0, true, now); } SharedPtr flow = new Flow(flowId, now); @@ -624,7 +660,7 @@ SharedPtr Bond::createFlow(int pathIdx, int32_t flowId, unsigned cha if (pathIdx != ZT_MAX_PEER_NETWORK_PATHS) { flow->assignPath(pathIdx, now); _paths[pathIdx].assignedFlowCount++; - debug("assign in-flow %x to link %s (%u / %lu)", flow->id, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].assignedFlowCount, _flows.size()); + debug("assign in-flow %04x to link %s (%u / %lu)", flow->id, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].assignedFlowCount, _flows.size()); } /** * Add a flow when no path was provided. This means that it is an outgoing packet @@ -638,13 +674,13 @@ SharedPtr Bond::createFlow(int pathIdx, int32_t flowId, unsigned cha void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now) { - std::map >::iterator it = _flows.begin(); - std::map >::iterator oldestFlow = _flows.end(); + std::map >::iterator it = _flows.begin(); + std::map >::iterator oldestFlow = _flows.end(); SharedPtr expiredFlow; if (age) { // Remove by specific age while (it != _flows.end()) { if (it->second->age(now) > age) { - debug("forget flow %x (age %llu) (%u / %lu)", it->first, (unsigned long long)it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1)); + debug("forget flow %04x (age %llu) (%u / %lu)", it->first, (unsigned long long)it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1)); _paths[it->second->assignedPath].assignedFlowCount--; it = _flows.erase(it); } @@ -663,7 +699,7 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now) ++it; } if (oldestFlow != _flows.end()) { - debug("forget oldest flow %x (age %llu) (total flows: %lu)", oldestFlow->first, (unsigned long long)oldestFlow->second->age(now), (unsigned long)(_flows.size() - 1)); + debug("forget oldest flow %04x (age %llu) (total flows: %lu)", oldestFlow->first, (unsigned long long)oldestFlow->second->age(now), (unsigned long)(_flows.size() - 1)); _paths[oldestFlow->second->assignedPath].assignedFlowCount--; _flows.erase(oldestFlow); } @@ -810,7 +846,7 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con char qosData[ZT_QOS_MAX_PACKET_SIZE]; int16_t len = generateQoSPacket(pathIdx, _now, qosData); if (len) { - debug("sending QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len); + // debug("sending QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len); outp.append(qosData, len); if (atAddress) { outp.armor(_peer->key(), false, _peer->aesKeysIfSupported()); @@ -827,6 +863,9 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con void Bond::processBackgroundBondTasks(void* tPtr, int64_t now) { + if (! _run) { + return; + } if (! _peer->_localMultipathSupported || (now - _lastBackgroundTaskCheck) < ZT_BOND_BACKGROUND_TASK_MIN_INTERVAL) { return; } @@ -852,7 +891,7 @@ void Bond::processBackgroundBondTasks(void* tPtr, int64_t now) RR->node->putPacket(tPtr, _paths[i].p->localSocket(), _paths[i].p->address(), outp.data(), outp.size()); _paths[i].p->_lastOut = now; _overheadBytes += outp.size(); - debug("tx: verb 0x%-2x of len %4d via %s (ECHO)", Packet::VERB_ECHO, outp.size(), pathToStr(_paths[i].p).c_str()); + // debug("tx: verb 0x%-2x of len %4d via %s (ECHO)", Packet::VERB_ECHO, outp.size(), pathToStr(_paths[i].p).c_str()); } } // QOS @@ -970,11 +1009,9 @@ void Bond::curateBond(int64_t now, bool rebuildBond) if (! currEligibility) { _paths[i].adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, ! currEligibility); if (_paths[i].bonded) { - if (_allowFlowHashing) { - debug("link %s was bonded, flow reallocation will occur soon", pathToStr(_paths[i].p).c_str()); - rebuildBond = true; - _paths[i].shouldReallocateFlows = _paths[i].bonded; - } + debug("link %s was bonded, flow reallocation will occur soon", pathToStr(_paths[i].p).c_str()); + rebuildBond = true; + _paths[i].shouldAvoid = true; _paths[i].bonded = false; } } @@ -999,6 +1036,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond) */ bool foundUsablePrimaryPath = false; for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + // debug("[%d], bonded=%d, alive=%d", i, _paths[i].bonded , _paths[i].alive); if (_paths[i].p && _paths[i].bonded && _paths[i].alive) { foundUsablePrimaryPath = true; } @@ -1014,11 +1052,9 @@ void Bond::curateBond(int64_t now, bool rebuildBond) rebuildBond = true; } if (rebuildBond) { - debug("rebuilding bond"); - // Clear previous bonded index mapping for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { - _bondIdxMap[i] = ZT_MAX_PEER_NETWORK_PATHS; + _realIdxMap[i] = ZT_MAX_PEER_NETWORK_PATHS; _paths[i].bonded = false; } @@ -1037,11 +1073,10 @@ void Bond::curateBond(int64_t now, bool rebuildBond) std::map, std::vector >::iterator it = linkMap.begin(); while (it != linkMap.end()) { SharedPtr link = it->first; - int ipvPref = link->ipvPref(); // Bond a spare link if required (no viable primary links left) if (! foundUsablePrimaryPath) { - log("no usable primary links remain, will attempt to use spare if available"); + debug("no usable primary links remain, will attempt to use spare if available"); for (int j = 0; j < it->second.size(); j++) { int idx = it->second.at(j); if (! _paths[idx].p || ! _paths[idx].eligible || ! _paths[idx].allowed() || ! _paths[idx].isSpare()) { @@ -1053,6 +1088,8 @@ void Bond::curateBond(int64_t now, bool rebuildBond) } } + int ipvPref = link->ipvPref(); + // If user has no address type preference, then use every path we find on a link if (ipvPref == 0) { for (int j = 0; j < it->second.size(); j++) { @@ -1127,26 +1164,6 @@ void Bond::curateBond(int64_t now, bool rebuildBond) void Bond::estimatePathQuality(int64_t now) { - uint32_t totUserSpecifiedLinkSpeed = 0; - if (_numBondedPaths) { // Compute relative user-specified speeds of links - for (unsigned int i = 0; i < _numBondedPaths; ++i) { - if (_paths[i].p && _paths[i].allowed()) { - SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - if (link) { - totUserSpecifiedLinkSpeed += link->speed(); - } - } - } - for (unsigned int i = 0; i < _numBondedPaths; ++i) { - if (_paths[i].p && _paths[i].allowed()) { - SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - if (link) { - link->setRelativeSpeed((uint8_t)round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255)); - } - } - } - } - float lat[ZT_MAX_PEER_NETWORK_PATHS] = { 0 }; float pdv[ZT_MAX_PEER_NETWORK_PATHS] = { 0 }; float plr[ZT_MAX_PEER_NETWORK_PATHS] = { 0 }; @@ -1157,35 +1174,15 @@ void Bond::estimatePathQuality(int64_t now) float maxPLR = 0; float maxPER = 0; - float quality[ZT_MAX_PEER_NETWORK_PATHS] = { 0 }; - uint8_t alloc[ZT_MAX_PEER_NETWORK_PATHS] = { 0 }; + float absoluteQuality[ZT_MAX_PEER_NETWORK_PATHS] = { 0 }; float totQuality = 0.0f; - // Compute initial summary statistics + // Process observation samples, compute summary statistics, and compute relative link qualities for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { if (! _paths[i].p || ! _paths[i].allowed()) { continue; } - // Compute/Smooth average of real-world observations - _paths[i].latencyMean = _paths[i].latencySamples.mean(); - _paths[i].latencyVariance = _paths[i].latencySamples.stddev(); - - // Write values to external path object so that it can be propagated to the user - _paths[i].p->_latencyMean = _paths[i].latencyMean; - _paths[i].p->_latencyVariance = _paths[i].latencyVariance; - _paths[i].p->_packetLossRatio = _paths[i].packetLossRatio; - _paths[i].p->_packetErrorRatio = _paths[i].packetErrorRatio; - _paths[i].p->_bonded = _paths[i].bonded; - _paths[i].p->_eligible = _paths[i].eligible; - // _valid is written elsewhere - _paths[i].p->_allocation = _paths[i].allocation; - SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); - if (link) { - _paths[i].p->_givenLinkSpeed = link->speed(); - } - //_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0); - // Drain unacknowledged QoS records int qosRecordTimeout = (_qosSendInterval * 3); std::map::iterator it = _paths[i].qosStatsOut.begin(); @@ -1200,7 +1197,7 @@ void Bond::estimatePathQuality(int64_t now) } } if (numDroppedQosOutRecords) { - debug("Dropped %d QOS out-records", numDroppedQosOutRecords); + // debug("dropped %d QOS out-records", numDroppedQosOutRecords); } /* @@ -1229,116 +1226,185 @@ void Bond::estimatePathQuality(int64_t now) } } if (numDroppedQosInRecords) { - log("Dropped %d QOS in-records", numDroppedQosInRecords); + // debug("dropped %d QOS in-records", numDroppedQosInRecords); } - quality[i] = 0; + absoluteQuality[i] = 0; totQuality = 0; // Normalize raw observations according to sane limits and/or user specified values - lat[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].latencyMean, 0, _maxAcceptableLatency, 0, 1)); - pdv[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1)); - plr[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1)); - per[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1)); + lat[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].latency, 0, _qw[ZT_QOS_LAT_MAX_IDX], 0, 1)); + pdv[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].latencyVariance, 0, _qw[ZT_QOS_PDV_MAX_IDX], 0, 1)); + plr[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].packetLossRatio, 0, _qw[ZT_QOS_PLR_MAX_IDX], 0, 1)); + per[i] = 1.0 / expf(4 * Utils::normalize(_paths[i].packetErrorRatio, 0, _qw[ZT_QOS_PER_MAX_IDX], 0, 1)); // Record bond-wide maximums to determine relative values maxLAT = lat[i] > maxLAT ? lat[i] : maxLAT; maxPDV = pdv[i] > maxPDV ? pdv[i] : maxPDV; maxPLR = plr[i] > maxPLR ? plr[i] : maxPLR; maxPER = per[i] > maxPER ? per[i] : maxPER; } + + // Compute relative user-specified link capacities (may change during life of Bond) + int maxObservedLinkCap = 0; + // Find current maximum + for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (_paths[i].p && _paths[i].allowed()) { + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); + if (link) { + int linkSpeed = link->capacity(); + _paths[i].p->_givenLinkSpeed = linkSpeed; + maxObservedLinkCap = linkSpeed > maxObservedLinkCap ? linkSpeed : maxObservedLinkCap; + } + } + } + // Compute relative link capacity (Used for weighting traffic allocations) + for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (_paths[i].p && _paths[i].allowed()) { + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); + if (link) { + float relativeCapacity = (link->capacity() / (float)maxObservedLinkCap); + link->setRelativeCapacity(relativeCapacity); + _paths[i].relativeLinkCapacity = relativeCapacity; + } + } + } + // Convert metrics to relative quantities and apply contribution weights for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { if (_paths[i].p && _paths[i].bonded) { - quality[i] += ((maxLAT > 0.0f ? lat[i] / maxLAT : 0.0f) * _qw[ZT_QOS_LAT_IDX]); - quality[i] += ((maxPDV > 0.0f ? pdv[i] / maxPDV : 0.0f) * _qw[ZT_QOS_PDV_IDX]); - quality[i] += ((maxPLR > 0.0f ? plr[i] / maxPLR : 0.0f) * _qw[ZT_QOS_PLR_IDX]); - quality[i] += ((maxPER > 0.0f ? per[i] / maxPER : 0.0f) * _qw[ZT_QOS_PER_IDX]); - totQuality += quality[i]; + absoluteQuality[i] += ((maxLAT > 0.0f ? lat[i] / maxLAT : 0.0f) * _qw[ZT_QOS_LAT_WEIGHT_IDX]); + absoluteQuality[i] += ((maxPDV > 0.0f ? pdv[i] / maxPDV : 0.0f) * _qw[ZT_QOS_PDV_WEIGHT_IDX]); + absoluteQuality[i] += ((maxPLR > 0.0f ? plr[i] / maxPLR : 0.0f) * _qw[ZT_QOS_PLR_WEIGHT_IDX]); + absoluteQuality[i] += ((maxPER > 0.0f ? per[i] / maxPER : 0.0f) * _qw[ZT_QOS_PER_WEIGHT_IDX]); + absoluteQuality[i] *= _paths[i].relativeLinkCapacity; + totQuality += absoluteQuality[i]; } } - // Normalize to 8-bit allocation values + + // Compute quality of link relative to all others in the bond (also accounting for stated link capacity) + if (totQuality > 0.0) { + for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (_paths[i].p && _paths[i].bonded) { + _paths[i].relativeQuality = absoluteQuality[i] / totQuality; + // debug("[%2d], abs=%f, tot=%f, rel=%f, relcap=%f", i, absoluteQuality[i], totQuality, _paths[i].relativeQuality, _paths[i].relativeLinkCapacity); + } + } + } + + // Compute summary statistics for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { - if (_paths[i].p && _paths[i].bonded) { - alloc[i] = (uint8_t)(std::ceil((quality[i] / totQuality) * (float)255)); - _paths[i].allocation = alloc[i]; + if (! _paths[i].p || ! _paths[i].allowed()) { + continue; + } + // Compute/Smooth average of real-world observations + if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) { + _paths[i].latency = _paths[i].latencySamples.mean(); + } + if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) { + _paths[i].latencyVariance = _paths[i].latencySamples.stddev(); + } + + // Write values to external path object so that it can be propagated to the user + _paths[i].p->_latencyMean = _paths[i].latency; + _paths[i].p->_latencyVariance = _paths[i].latencyVariance; + _paths[i].p->_packetLossRatio = _paths[i].packetLossRatio; + _paths[i].p->_packetErrorRatio = _paths[i].packetErrorRatio; + _paths[i].p->_bonded = _paths[i].bonded; + _paths[i].p->_eligible = _paths[i].eligible; + //_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0); + // _valid is written elsewhere + _paths[i].p->_relativeQuality = _paths[i].relativeQuality; + } + + // Flag links for avoidance + for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (! _paths[i].p || ! _paths[i].allowed()) { + continue; + } + bool shouldAvoid = false; + if (! _paths[i].shouldAvoid) { + if (_paths[i].latency > _qw[ZT_QOS_LAT_MAX_IDX]) { + log("avoiding link %s because (lat %6.4f > %6.4f)", pathToStr(_paths[i].p).c_str(), _paths[i].latency, _qw[ZT_QOS_LAT_MAX_IDX]); + shouldAvoid = true; + } + if (_paths[i].latencyVariance > _qw[ZT_QOS_PDV_MAX_IDX]) { + log("avoiding link %s because (pdv %6.4f > %6.4f)", pathToStr(_paths[i].p).c_str(), _paths[i].latencyVariance, _qw[ZT_QOS_PDV_MAX_IDX]); + shouldAvoid = true; + } + if (_paths[i].packetErrorRatio > _qw[ZT_QOS_PER_MAX_IDX]) { + log("avoiding link %s because (per %6.4f > %6.4f)", pathToStr(_paths[i].p).c_str(), _paths[i].packetErrorRatio, _qw[ZT_QOS_PER_MAX_IDX]); + shouldAvoid = true; + } + if (_paths[i].packetLossRatio > _qw[ZT_QOS_PLR_MAX_IDX]) { + log("avoiding link %s because (plr %6.4f > %6.4f)", pathToStr(_paths[i].p).c_str(), _paths[i].packetLossRatio, _qw[ZT_QOS_PLR_MAX_IDX]); + shouldAvoid = true; + } + _paths[i].shouldAvoid = shouldAvoid; + } + else { + if (! shouldAvoid) { + log("no longer avoiding link %s", pathToStr(_paths[i].p).c_str()); + _paths[i].shouldAvoid = false; + } } } } void Bond::processBalanceTasks(int64_t now) { - if (_allowFlowHashing) { - /** - * Clean up and reset flows if necessary - */ - if ((now - _lastFlowExpirationCheck) > ZT_PEER_PATH_EXPIRATION) { - Mutex::Lock _l(_flows_m); - forgetFlowsWhenNecessary(ZT_PEER_PATH_EXPIRATION, false, now); - std::map >::iterator it = _flows.begin(); - while (it != _flows.end()) { - it->second->resetByteCounts(); - ++it; - } - _lastFlowExpirationCheck = now; + if (! _numBondedPaths) { + return; + } + /** + * Clean up and reset flows if necessary + */ + if ((now - _lastFlowExpirationCheck) > ZT_PEER_PATH_EXPIRATION) { + Mutex::Lock _l(_flows_m); + forgetFlowsWhenNecessary(ZT_PEER_PATH_EXPIRATION, false, now); + std::map >::iterator it = _flows.begin(); + while (it != _flows.end()) { + it->second->resetByteCounts(); + ++it; } - /** - * Re-allocate flows from dead paths - */ - if (_policy == ZT_BOND_POLICY_BALANCE_XOR || _policy == ZT_BOND_POLICY_BALANCE_AWARE) { - Mutex::Lock _l(_flows_m); - for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { - if (! _paths[i].p) { - continue; - } - if (! _paths[i].eligible && _paths[i].shouldReallocateFlows) { - log("reallocate flows from dead link %s", pathToStr(_paths[i].p).c_str()); - std::map >::iterator flow_it = _flows.begin(); - while (flow_it != _flows.end()) { - if (_paths[flow_it->second->assignedPath].p == _paths[i].p) { - if (assignFlowToBondedPath(flow_it->second, now)) { - _paths[i].assignedFlowCount--; - } - } - ++flow_it; - } - _paths[i].shouldReallocateFlows = false; + _lastFlowExpirationCheck = now; + } + /** + * Move (all) flows from dead paths + */ + if (_policy == ZT_BOND_POLICY_BALANCE_XOR || _policy == ZT_BOND_POLICY_BALANCE_AWARE) { + Mutex::Lock _l(_flows_m); + std::map >::iterator flow_it = _flows.begin(); + while (flow_it != _flows.end()) { + if (! _paths[flow_it->second->assignedPath].p) { + continue; + } + int originalPathIdx = flow_it->second->assignedPath; + if (! _paths[originalPathIdx].eligible) { + log("moving all flows from dead link %s", pathToStr(_paths[originalPathIdx].p).c_str()); + if (assignFlowToBondedPath(flow_it->second, now, true)) { + _paths[originalPathIdx].assignedFlowCount--; } } + ++flow_it; } - /** - * Re-allocate flows from under-performing - * NOTE: This could be part of the above block but was kept separate for clarity. - */ - if (_policy == ZT_BOND_POLICY_BALANCE_AWARE) { - int totalAllocation = 0; - for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { - if (! _paths[i].p) { - continue; - } - if (_paths[i].p && _paths[i].bonded && _paths[i].eligible) { - totalAllocation += _paths[i].allocation; - } - } - unsigned char minimumAllocationValue = (uint8_t)(0.33 * ((float)totalAllocation / (float)_numBondedPaths)); - - Mutex::Lock _l(_flows_m); - for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { - if (! _paths[i].p) { - continue; - } - if (_paths[i].p && _paths[i].bonded && _paths[i].eligible && (_paths[i].allocation < minimumAllocationValue) && _paths[i].assignedFlowCount) { - log("reallocate flows from under-performing link %s\n", pathToStr(_paths[i].p).c_str()); - std::map >::iterator flow_it = _flows.begin(); - while (flow_it != _flows.end()) { - if (flow_it->second->assignedPath == _paths[i].p) { - if (assignFlowToBondedPath(flow_it->second, now)) { - _paths[i].assignedFlowCount--; - } - } - ++flow_it; - } - _paths[i].shouldReallocateFlows = false; + } + /** + * Move (some) flows from low quality paths + */ + if (_policy == ZT_BOND_POLICY_BALANCE_AWARE) { + Mutex::Lock _l(_flows_m); + std::map >::iterator flow_it = _flows.begin(); + while (flow_it != _flows.end()) { + if (! _paths[flow_it->second->assignedPath].p) { + continue; + } + int originalPathIdx = flow_it->second->assignedPath; + if (_paths[originalPathIdx].shouldAvoid) { + if (assignFlowToBondedPath(flow_it->second, now, true)) { + _paths[originalPathIdx].assignedFlowCount--; + return; // Only move one flow at a time } } + ++flow_it; } } } @@ -1534,7 +1600,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } if (! _paths[i].failoverScore) { // If we didn't inherit a failover score from a "parent" that wants to use this path as a failover - int newHandicap = failoverScoreHandicap ? failoverScoreHandicap : _paths[i].allocation; + int newHandicap = failoverScoreHandicap ? failoverScoreHandicap : (_paths[i].relativeQuality * 255.0); _paths[i].failoverScore = newHandicap; } SharedPtr failoverLink; @@ -1603,7 +1669,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) _paths[i].negotiated = false; } */ - _paths[i].failoverScore = _paths[i].allocation + failoverScoreHandicap; + _paths[i].failoverScore = _paths[i].relativeQuality + failoverScoreHandicap; if (_paths[i].p.ptr() != _paths[_abPathIdx].p.ptr()) { bool bFoundPathInQueue = false; for (std::deque::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end(); ++it) { @@ -1703,7 +1769,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) int prevFScore = _paths[_abPathIdx].failoverScore; // Establish a minimum switch threshold to prevent flapping int failoverScoreDifference = _paths[_abFailoverQueue.front()].failoverScore - _paths[_abPathIdx].failoverScore; - int thresholdQuantity = (int)(ZT_BOND_ACTIVE_BACKUP_OPTIMIZE_MIN_THRESHOLD * (float)_paths[_abPathIdx].allocation); + int thresholdQuantity = (int)(ZT_BOND_ACTIVE_BACKUP_OPTIMIZE_MIN_THRESHOLD * (float)_paths[_abPathIdx].relativeQuality); if ((failoverScoreDifference > 0) && (failoverScoreDifference > thresholdQuantity)) { SharedPtr oldPath = _paths[_abPathIdx].p; dequeueNextActiveBackupPath(now); @@ -1746,10 +1812,6 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT } _isLeaf = _peer ? (role != ZT_PEER_ROLE_PLANET && role != ZT_PEER_ROLE_MOON) : false; - // Flows - - _allowFlowHashing = false; - // Path negotiation _allowPathNegotiation = false; @@ -1761,7 +1823,7 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _userHasSpecifiedPrimaryLink = false; _userHasSpecifiedFailoverInstructions = false; - _userHasSpecifiedLinkSpeeds = 0; + _userHasSpecifiedLinkCapacities = 0; // Bond status @@ -1769,62 +1831,36 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _numTotalLinks = 0; _numBondedPaths = 0; - // active-backup - - _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS; - - // rr - - _rrPacketsSentOnCurrLink = 0; - _rrIdx = 0; - // General parameters _downDelay = 0; _upDelay = 0; _monitorInterval = 0; - // (Sane?) limits - - _maxAcceptableLatency = 100; - _maxAcceptablePacketDelayVariance = 50; - _maxAcceptablePacketLossRatio = 0.10f; - _maxAcceptablePacketErrorRatio = 0.10f; - // balance-aware _totalBondUnderload = 0; _overheadBytes = 0; /** - * Policy-specific defaults + * Policy defaults */ - switch (_policy) { - case ZT_BOND_POLICY_ACTIVE_BACKUP: - _abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_OPTIMIZE; - break; - case ZT_BOND_POLICY_BROADCAST: - _downDelay = 30000; - _upDelay = 0; - break; - case ZT_BOND_POLICY_BALANCE_RR: - _packetsPerLink = 64; - break; - case ZT_BOND_POLICY_BALANCE_XOR: - _allowFlowHashing = true; - break; - case ZT_BOND_POLICY_BALANCE_AWARE: - _allowFlowHashing = true; - break; - default: - break; - } + _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS; + _abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_OPTIMIZE; + _rrPacketsSentOnCurrLink = 0; + _rrIdx = 0; + _packetsPerLink = 64; - _qw[ZT_QOS_LAT_IDX] = 0.3f; - _qw[ZT_QOS_LTM_IDX] = 0.1f; - _qw[ZT_QOS_PDV_IDX] = 0.3f; - _qw[ZT_QOS_PLR_IDX] = 0.1f; - _qw[ZT_QOS_PER_IDX] = 0.1f; + // Sane quality defaults + + _qw[ZT_QOS_LAT_MAX_IDX] = 500.0f; + _qw[ZT_QOS_PDV_MAX_IDX] = 100.0f; + _qw[ZT_QOS_PLR_MAX_IDX] = 0.001f; + _qw[ZT_QOS_PER_MAX_IDX] = 0.0001f; + _qw[ZT_QOS_LAT_WEIGHT_IDX] = 0.25f; + _qw[ZT_QOS_PDV_WEIGHT_IDX] = 0.25f; + _qw[ZT_QOS_PLR_WEIGHT_IDX] = 0.25f; + _qw[ZT_QOS_PER_WEIGHT_IDX] = 0.25f; _failoverInterval = ZT_BOND_FAILOVER_DEFAULT_INTERVAL; @@ -1836,7 +1872,8 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _downDelay = templateBond->_downDelay; _upDelay = templateBond->_upDelay; _abLinkSelectMethod = templateBond->_abLinkSelectMethod; - memcpy(_qw, templateBond->_qw, ZT_QOS_WEIGHT_SIZE * sizeof(float)); + memcpy(_qw, templateBond->_qw, ZT_QOS_PARAMETER_SIZE * sizeof(float)); + debug("user link quality spec = {%6.3f, %6.3f, %6.3f, %6.3f, %6.3f, %6.3f, %6.3f, %6.3f}", _qw[0], _qw[1], _qw[2], _qw[3], _qw[4], _qw[5], _qw[6], _qw[7]); } if (! _isLeaf) { @@ -1854,16 +1891,18 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _defaultPathRefractoryPeriod = 8000; } -void Bond::setUserQualityWeights(float weights[], int len) +void Bond::setUserLinkQualitySpec(float weights[], int len) { - if (len == ZT_QOS_WEIGHT_SIZE) { - float weightTotal = 0.0; - for (unsigned int i = 0; i < ZT_QOS_WEIGHT_SIZE; ++i) { - weightTotal += weights[i]; - } - if (weightTotal > 0.99 && weightTotal < 1.01) { - memcpy(_qw, weights, len * sizeof(float)); - } + if (len != ZT_QOS_PARAMETER_SIZE) { + debug("link quality spec has an invalid number of parameters (%d out of %d), ignoring", len, ZT_QOS_PARAMETER_SIZE); + return; + } + float weightTotal = 0.0; + for (unsigned int i = 4; i < ZT_QOS_PARAMETER_SIZE; ++i) { + weightTotal += weights[i]; + } + if (weightTotal > 0.99 && weightTotal < 1.01) { + memcpy(_qw, weights, len * sizeof(float)); } } @@ -1898,7 +1937,7 @@ void Bond::dumpPathStatus(int64_t now, int pathIdx) std::string aliveOrDead = _paths[pathIdx].alive ? std::string("alive") : std::string("dead"); std::string eligibleOrNot = _paths[pathIdx].eligible ? std::string("eligible") : std::string("ineligible"); std::string bondedOrNot = _paths[pathIdx].bonded ? std::string("bonded") : std::string("unbonded"); - log("path[%2u] --- %5s (in %7lld, out: %7lld), %10s, %8s, flows=%-6u lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3u --- (%s) spare=%d", + log("path[%2u] --- %5s (in %7lld, out: %7lld), %10s, %8s, flows=%-6u lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f qual=%-6.4f --- (%s) spare=%d", pathIdx, aliveOrDead.c_str(), static_cast(_paths[pathIdx].p->age(now)), @@ -1906,11 +1945,11 @@ void Bond::dumpPathStatus(int64_t now, int pathIdx) eligibleOrNot.c_str(), bondedOrNot.c_str(), _paths[pathIdx].assignedFlowCount, - _paths[pathIdx].latencyMean, + _paths[pathIdx].latency, _paths[pathIdx].latencyVariance, _paths[pathIdx].packetErrorRatio, _paths[pathIdx].packetLossRatio, - _paths[pathIdx].allocation, + _paths[pathIdx].relativeQuality, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].isSpare()); #endif diff --git a/node/Bond.hpp b/node/Bond.hpp index abb0a8507..3419c8cfe 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -29,7 +29,7 @@ /** * Indices for the path quality weight vector */ -enum ZT_BondQualityWeightIndex { ZT_QOS_LAT_IDX, ZT_QOS_LTM_IDX, ZT_QOS_PDV_IDX, ZT_QOS_PLR_IDX, ZT_QOS_PER_IDX, ZT_QOS_WEIGHT_SIZE }; +enum ZT_BondQualityWeightIndex { ZT_QOS_LAT_MAX_IDX, ZT_QOS_PDV_MAX_IDX, ZT_QOS_PLR_MAX_IDX, ZT_QOS_PER_MAX_IDX, ZT_QOS_LAT_WEIGHT_IDX, ZT_QOS_PDV_WEIGHT_IDX, ZT_QOS_PLR_WEIGHT_IDX, ZT_QOS_PER_WEIGHT_IDX, ZT_QOS_PARAMETER_SIZE }; /** * Multipath bonding policy @@ -117,17 +117,16 @@ class Link { * * @param ifnameStr * @param ipvPref - * @param speed + * @param capacity * @param enabled * @param mode * @param failoverToLinkStr - * @param userSpecifiedAlloc */ - Link(std::string ifnameStr, uint8_t ipvPref, uint32_t speed, bool enabled, uint8_t mode, std::string failoverToLinkStr) + Link(std::string ifnameStr, uint8_t ipvPref, uint32_t capacity, bool enabled, uint8_t mode, std::string failoverToLinkStr) : _ifnameStr(ifnameStr) , _ipvPref(ipvPref) - , _speed(speed) - , _relativeSpeed(0) + , _capacity(capacity) + , _relativeCapacity(0.0) , _enabled(enabled) , _mode(mode) , _failoverToLinkStr(failoverToLinkStr) @@ -194,29 +193,29 @@ class Link { } /** - * @return The speed of the link relative to others in the bond. + * @return The capacity of the link relative to others in the bond. */ - inline uint8_t relativeSpeed() + inline float relativeCapacity() { - return _relativeSpeed; + return _relativeCapacity; } /** - * Sets the speed of the link relative to others in the bond. + * Sets the capacity of the link relative to others in the bond. * - * @param relativeSpeed The speed relative to the rest of the link. + * @param relativeCapacity The capacity relative to the rest of the link. */ - inline void setRelativeSpeed(uint8_t relativeSpeed) + inline void setRelativeCapacity(float relativeCapacity) { - _relativeSpeed = relativeSpeed; + _relativeCapacity = relativeCapacity; } /** - * @return The absolute speed of the link (as specified by the user.) + * @return The absolute capacity of the link (as specified by the user.) */ - inline uint32_t speed() + inline uint32_t capacity() { - return _speed; + return _capacity; } /** @@ -262,14 +261,14 @@ class Link { uint8_t _ipvPref; /** - * User-specified speed of this link + * User-specified capacity of this link */ - uint32_t _speed; + uint32_t _capacity; /** * Speed relative to other specified links (computed by Bond) */ - uint8_t _relativeSpeed; + float _relativeCapacity; /** * Whether this link is enabled, or (disabled (possibly bad config)) @@ -302,6 +301,17 @@ class Peer; class Bond { public: + + /** + * Stop bond's internal functions (can be resumed) + */ + void stopBond(); + + /** + * Start or resume a bond's internal functions + */ + void startBond(); + /** * @return Whether this link is permitted to become a member of a bond. */ @@ -576,6 +586,14 @@ class Bond { return _policyAlias; } + /** + * Return whether this bond is able to properly process traffic + */ + bool isReady() + { + return _numBondedPaths; + } + /** * Inform the bond about the path that its peer (owning object) just learned about. * If the path is allowed to be used, it will be inducted into the bond on a trial @@ -706,8 +724,9 @@ class Bond { * * @param flow Flow to be assigned * @param now Current time + * @param reassign Whether this flow is being re-assigned to another path */ - bool assignFlowToBondedPath(SharedPtr& flow, int64_t now); + bool assignFlowToBondedPath(SharedPtr& flow, int64_t now, bool reassign); /** * Determine whether a path change should occur given the remote peer's reported utility and our @@ -796,52 +815,12 @@ class Bond { void setBondParameters(int policy, SharedPtr templateBond, bool useTemplate); /** - * Check and assign user-specified quality weights to this bond. + * Check and assign user-specified link quality parameters to this bond. * - * @param weights Set of user-specified weights - * @param len Length of weight vector + * @param weights Set of user-specified parameters + * @param len Length of parameter vector */ - void setUserQualityWeights(float weights[], int len); - - /** - * @param latencyInMilliseconds Maximum acceptable latency. - */ - void setMaxAcceptableLatency(int16_t latencyInMilliseconds) - { - _maxAcceptableLatency = latencyInMilliseconds; - } - - /** - * @param latencyInMilliseconds Maximum acceptable (mean) latency. - */ - void setMaxAcceptableMeanLatency(int16_t latencyInMilliseconds) - { - _maxAcceptableMeanLatency = latencyInMilliseconds; - } - - /** - * @param latencyVarianceInMilliseconds Maximum acceptable packet delay variance (jitter). - */ - void setMaxAcceptablePacketDelayVariance(int16_t latencyVarianceInMilliseconds) - { - _maxAcceptablePacketDelayVariance = latencyVarianceInMilliseconds; - } - - /** - * @param lossRatio Maximum acceptable packet loss ratio (PLR). - */ - void setMaxAcceptablePacketLossRatio(float lossRatio) - { - _maxAcceptablePacketLossRatio = lossRatio; - } - - /** - * @param errorRatio Maximum acceptable packet error ratio (PER). - */ - void setMaxAcceptablePacketErrorRatio(float errorRatio) - { - _maxAcceptablePacketErrorRatio = errorRatio; - } + void setUserLinkQualitySpec(float weights[], int len); /** * @return Whether the user has defined links for use on this bond @@ -868,11 +847,11 @@ class Bond { } /** - * @return Whether the user has specified link speeds + * @return Whether the user has specified link capacities */ - inline bool userHasSpecifiedLinkSpeeds() + inline bool userHasSpecifiedLinkCapacities() { - return _userHasSpecifiedLinkSpeeds; + return _userHasSpecifiedLinkCapacities; } /** @@ -911,10 +890,9 @@ class Bond { */ inline bool rateGateQoS(int64_t now, SharedPtr& path) { - // TODO: Verify before production char pathStr[64] = { 0 }; path->address().toString(pathStr); - int diff = now - _lastQoSRateCheck; + uint64_t diff = now - _lastQoSRateCheck; if ((diff) <= (_qosSendInterval / ZT_MAX_PEER_NETWORK_PATHS)) { ++_qosCutoffCount; } @@ -922,7 +900,6 @@ class Bond { _qosCutoffCount = 0; } _lastQoSRateCheck = now; - // fprintf(stderr, "rateGateQoS (count=%d, send_interval=%d, diff=%d, path=%s)\n", _qosCutoffCount, _qosSendInterval, diff, pathStr); return (_qosCutoffCount < (ZT_MAX_PEER_NETWORK_PATHS * 2)); } @@ -934,7 +911,6 @@ class Bond { */ inline bool rateGatePathNegotiation(int64_t now, SharedPtr& path) { - // TODO: Verify before production char pathStr[64] = { 0 }; path->address().toString(pathStr); int diff = now - _lastPathNegotiationReceived; @@ -945,7 +921,6 @@ class Bond { _pathNegotiationCutoffCount = 0; } _lastPathNegotiationReceived = now; - // fprintf(stderr, "rateGateNeg (count=%d, send_interval=%d, diff=%d, path=%s)\n", _pathNegotiationCutoffCount, (ZT_PATH_NEGOTIATION_CUTOFF_TIME / ZT_MAX_PEER_NETWORK_PATHS), diff, pathStr); return (_pathNegotiationCutoffCount < (ZT_MAX_PEER_NETWORK_PATHS * 2)); } @@ -1061,20 +1036,11 @@ class Bond { } /** - * - * @param allowFlowHashing + * @return Whether flow-hashing is currently supported for this bond. */ - inline void setFlowHashing(bool allowFlowHashing) + bool flowHashingSupported() { - _allowFlowHashing = allowFlowHashing; - } - - /** - * @return Whether flow-hashing is currently enabled for this bond. - */ - bool flowHashingEnabled() - { - return _allowFlowHashing; + return _policy == ZT_BOND_POLICY_BALANCE_XOR || _policy == ZT_BOND_POLICY_BALANCE_AWARE; } /** @@ -1221,16 +1187,14 @@ class Bond { , onlyPathOnLink(false) , bonded(false) , negotiated(false) - , shouldReallocateFlows(false) + , shouldAvoid(false) , assignedFlowCount(0) - , latencyMean(0) + , latency(0) , latencyVariance(0) , packetLossRatio(0) , packetErrorRatio(0) - , allocation(0) - , byteLoad(0) - , relativeByteLoad(0) - , affinity(0) + , relativeQuality(0) + , relativeLinkCapacity(0) , failoverScore(0) , packetsReceivedSinceLastQoS(0) , packetsIn(0) @@ -1298,7 +1262,7 @@ class Bond { * @param now Current time * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time */ - inline bool needsToSendQoS(int64_t now, int qosSendInterval) + inline bool needsToSendQoS(int64_t now, uint64_t qosSendInterval) { // fprintf(stderr, "QOS table (%d / %d)\n", packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE); return ((packetsReceivedSinceLastQoS >= ZT_QOS_TABLE_SIZE) || ((now - lastQoSMeasurement) > qosSendInterval)) && packetsReceivedSinceLastQoS; @@ -1308,7 +1272,7 @@ class Bond { * @param now Current time * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time */ - inline bool needsToSendAck(int64_t now, int ackSendInterval) + inline bool needsToSendAck(int64_t now, uint64_t ackSendInterval) { return ((now - lastAckSent) >= ackSendInterval || (packetsReceivedSinceLastAck == ZT_QOS_TABLE_SIZE)) && packetsReceivedSinceLastAck; } @@ -1344,26 +1308,25 @@ class Bond { uint64_t lastRefractoryUpdate; // The last time that the refractory period was updated. uint64_t lastAliveToggle; // The last time that the path was marked as "alive". bool alive; - bool eligible; // State of eligibility at last check. Used for determining state changes. - uint64_t lastEligibility; // The last time that this path was eligible - uint64_t whenNominated; // Timestamp indicating when this path's trial period began. - uint32_t refractoryPeriod; // Amount of time that this path will be prevented from becoming a member of a bond. - uint8_t ipvPref; // IP version preference inherited from the physical link. - uint8_t mode; // Mode inherited from the physical link. - bool onlyPathOnLink; // IP version preference inherited from the physical link. - bool enabled; // Enabled state inherited from the physical link. - bool bonded; // Whether this path is currently part of a bond. - bool negotiated; // Whether this path was intentionally negotiated by either peer. - bool shouldReallocateFlows; // Whether flows should be moved from this path. Current traffic flows will be re-allocated immediately. - uint16_t assignedFlowCount; // The number of flows currently assigned to this path. - float latencyMean; // The mean latency (computed from a sliding window.) - float latencyVariance; // Packet delay variance (computed from a sliding window.) - float packetLossRatio; // The ratio of lost packets to received packets. - float packetErrorRatio; // The ratio of packets that failed their MAC/CRC checks to those that did not. - uint8_t allocation; // The relative quality of this path to all others in the bond, [0-255]. - uint64_t byteLoad; // How much load this path is under. - uint8_t relativeByteLoad; // How much load this path is under (relative to other paths in the bond.) - uint8_t affinity; // Relative value expressing how "deserving" this path is of new traffic. + bool eligible; // State of eligibility at last check. Used for determining state changes. + uint64_t lastEligibility; // The last time that this path was eligible + uint64_t whenNominated; // Timestamp indicating when this path's trial period began. + uint32_t refractoryPeriod; // Amount of time that this path will be prevented from becoming a member of a bond. + uint8_t ipvPref; // IP version preference inherited from the physical link. + uint8_t mode; // Mode inherited from the physical link. + bool onlyPathOnLink; // IP version preference inherited from the physical link. + bool enabled; // Enabled state inherited from the physical link. + bool bonded; // Whether this path is currently part of a bond. + bool negotiated; // Whether this path was intentionally negotiated by either peer. + bool shouldAvoid; // Whether flows should be moved from this path. Current traffic flows will be re-allocated immediately. + uint16_t assignedFlowCount; // The number of flows currently assigned to this path. + float latency; // The mean latency (computed from a sliding window.) + float latencyVariance; // Packet delay variance (computed from a sliding window.) + float packetLossRatio; // The ratio of lost packets to received packets. + float packetErrorRatio; // The ratio of packets that failed their MAC/CRC checks to those that did not. + float relativeQuality; // The relative quality of the link. + float relativeLinkCapacity; // The relative capacity of the link. + uint32_t failoverScore; // Score that indicates to what degree this path is preferred over others that are available to the bonding policy. (specifically for active-backup) int32_t packetsReceivedSinceLastQoS; // Number of packets received since the last VERB_QOS_MEASUREMENT was sent to the remote peer. @@ -1461,10 +1424,12 @@ class Bond { * may only be updated during a call to curateBond(). The reason for this is so that * we can simplify the high frequency packet egress logic. */ - int _bondIdxMap[ZT_MAX_PEER_NETWORK_PATHS]; - int _numBondedPaths; // Number of paths currently included in the _bondIdxMap set. - std::map > _flows; // Flows hashed according to port and protocol - float _qw[ZT_QOS_WEIGHT_SIZE]; // How much each factor contributes to the "quality" score of a path. + int _realIdxMap[ZT_MAX_PEER_NETWORK_PATHS] = { ZT_MAX_PEER_NETWORK_PATHS }; + int _numBondedPaths; // Number of paths currently included in the _realIdxMap set. + std::map > _flows; // Flows hashed according to port and protocol + float _qw[ZT_QOS_PARAMETER_SIZE]; // Link quality specification (can be customized by user) + + bool _run; uint8_t _policy; uint32_t _upDelay; @@ -1500,20 +1465,11 @@ class Bond { /** * Timers and intervals */ - uint32_t _failoverInterval; - uint32_t _qosSendInterval; - uint32_t _ackSendInterval; - uint32_t throughputMeasurementInterval; - uint32_t _qualityEstimationInterval; - - /** - * Acceptable quality thresholds - */ - float _maxAcceptablePacketLossRatio; - float _maxAcceptablePacketErrorRatio; - uint16_t _maxAcceptableLatency; - uint16_t _maxAcceptableMeanLatency; - uint16_t _maxAcceptablePacketDelayVariance; + uint64_t _failoverInterval; + uint64_t _qosSendInterval; + uint64_t _ackSendInterval; + uint64_t throughputMeasurementInterval; + uint64_t _qualityEstimationInterval; /** * Link state reporting @@ -1563,7 +1519,7 @@ class Bond { bool _userHasSpecifiedLinks; // Whether the user has specified links for this bond. bool _userHasSpecifiedPrimaryLink; // Whether the user has specified a primary link for this bond. bool _userHasSpecifiedFailoverInstructions; // Whether the user has specified failover instructions for this bond. - bool _userHasSpecifiedLinkSpeeds; // Whether the user has specified links speeds for this bond. + bool _userHasSpecifiedLinkCapacities; // Whether the user has specified links capacities for this bond. /** * How frequently (in ms) a VERB_ECHO is sent to a peer to verify that a * path is still active. A value of zero (0) will disable active path diff --git a/node/Constants.hpp b/node/Constants.hpp index 6389bdcd3..f9fa72333 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -390,7 +390,7 @@ /** * Number of samples to consider when processing real-time path statistics */ -#define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 32 +#define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 64 /** * Max allowable time spent in any queue (in ms) diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 72cd4bde7..9080128b6 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -707,7 +707,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { int32_t _flowId = ZT_QOS_NO_FLOW; SharedPtr bond = peer->bond(); - if (bond && bond->flowHashingEnabled()) { + if (bond && bond->flowHashingSupported()) { if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; diff --git a/node/Node.cpp b/node/Node.cpp index 087d8d048..019a8afca 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -510,7 +510,7 @@ ZT_PeerList *Node::peers() const p->paths[p->pathCount].latencyVariance = (*path)->latencyVariance(); p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio(); p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio(); - p->paths[p->pathCount].allocation = (*path)->allocation(); + p->paths[p->pathCount].relativeQuality = (*path)->relativeQuality(); p->paths[p->pathCount].linkSpeed = (*path)->givenLinkSpeed(); p->paths[p->pathCount].bonded = (*path)->bonded(); p->paths[p->pathCount].eligible = (*path)->eligible(); diff --git a/node/Path.hpp b/node/Path.hpp index 8782f35cf..11a3e5113 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -93,7 +93,7 @@ public: _eligible(false), _bonded(false), _givenLinkSpeed(0), - _allocation(0), + _relativeQuality(0), _latency(0xffff), _addr(), _ipScope(InetAddress::IP_SCOPE_NONE) @@ -113,7 +113,7 @@ public: _eligible(false), _bonded(false), _givenLinkSpeed(0), - _allocation(0), + _relativeQuality(0), _latency(0xffff), _addr(addr), _ipScope(addr.ipScope()) @@ -335,14 +335,14 @@ public: inline unsigned int bonded() const { return _bonded; } /** - * @return Given link speed as reported by the bonding layer + * @return Given link capacity as reported by the bonding layer */ inline unsigned int givenLinkSpeed() const { return _givenLinkSpeed; } /** - * @return Traffic allocation as reported by the bonding layer + * @return Path's quality as reported by the bonding layer */ - inline unsigned char allocation() const { return _allocation; } + inline float relativeQuality() const { return _relativeQuality; } /** * @return Physical interface name that this path lives on @@ -371,7 +371,7 @@ private: volatile bool _eligible; volatile bool _bonded; volatile uint32_t _givenLinkSpeed; - volatile uint8_t _allocation; + volatile float _relativeQuality; volatile unsigned int _latency; InetAddress _addr; diff --git a/node/Peer.cpp b/node/Peer.cpp index 963774d5e..99fa8d277 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -270,30 +270,30 @@ SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired, int32 { Mutex::Lock _l(_paths_m); Mutex::Lock _lb(_bond_m); - if (!_bond) { - unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; - /** - * Send traffic across the highest quality path only. This algorithm will still - * use the old path quality metric from protocol version 9. - */ - long bestPathQuality = 2147483647; - for(unsigned int i=0;iquality(now) / _paths[i].priority; - if (q <= bestPathQuality) { - bestPathQuality = q; - bestPath = i; - } - } - } else break; - } - if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) { - return _paths[bestPath].p; - } - return SharedPtr(); + if(_bond && _bond->isReady()) { + return _bond->getAppropriatePath(now, flowId); } - return _bond->getAppropriatePath(now, flowId); + unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS; + /** + * Send traffic across the highest quality path only. This algorithm will still + * use the old path quality metric from protocol version 9. + */ + long bestPathQuality = 2147483647; + for(unsigned int i=0;iquality(now) / _paths[i].priority; + if (q <= bestPathQuality) { + bestPathQuality = q; + bestPath = i; + } + } + } else break; + } + if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) { + return _paths[bestPath].p; + } + return SharedPtr(); } void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr &other) const diff --git a/one.cpp b/one.cpp index 9a3363933..62710547c 100644 --- a/one.cpp +++ b/one.cpp @@ -637,20 +637,20 @@ static int cli(int argc,char **argv) ); } printf("\nidx lat pdv " - "plr per speed alloc " + "plr per capacity qual " "rx_age tx_age eligible bonded\n"); for(int i=0; i<100; i++) { printf("-"); } printf("\n"); for (int i=0; i j["bonded"] = peer->paths[i].bonded; j["eligible"] = peer->paths[i].eligible; j["givenLinkSpeed"] = peer->paths[i].linkSpeed; - j["allocation"] = std::round(((float)(peer->paths[i].allocation) / 255.0) * 1000.0) / 1000.0; + j["relativeQuality"] = peer->paths[i].relativeQuality; } pa.push_back(j); } @@ -1484,7 +1484,6 @@ public: _peerToJson(res,&(pl->peers[i]),bond); scode = 200; } else { - fprintf(stderr, "unable to find bond to peer %llx\n", (unsigned long long)wantp); scode = 400; } } @@ -2023,23 +2022,20 @@ public: } // New bond, used as a copy template for new instances SharedPtr newTemplateBond = new Bond(NULL, basePolicyStr, customPolicyStr, SharedPtr()); - // Acceptable ranges newTemplateBond->setPolicy(basePolicyCode); - newTemplateBond->setMaxAcceptableLatency(OSUtils::jsonInt(customPolicy["maxAcceptableLatency"],-1)); - newTemplateBond->setMaxAcceptableMeanLatency(OSUtils::jsonInt(customPolicy["maxAcceptableMeanLatency"],-1)); - newTemplateBond->setMaxAcceptablePacketDelayVariance(OSUtils::jsonInt(customPolicy["maxAcceptablePacketDelayVariance"],-1)); - newTemplateBond->setMaxAcceptablePacketLossRatio((float)OSUtils::jsonDouble(customPolicy["maxAcceptablePacketLossRatio"],-1)); - newTemplateBond->setMaxAcceptablePacketErrorRatio((float)OSUtils::jsonDouble(customPolicy["maxAcceptablePacketErrorRatio"],-1)); - // Quality weights - json &qualityWeights = customPolicy["qualityWeights"]; - if (qualityWeights.size() == ZT_QOS_WEIGHT_SIZE) { - float weights[ZT_QOS_WEIGHT_SIZE]; - weights[ZT_QOS_LAT_IDX] = (float)OSUtils::jsonDouble(qualityWeights["lat"],0.0); - weights[ZT_QOS_LTM_IDX] = (float)OSUtils::jsonDouble(qualityWeights["ltm"],0.0); - weights[ZT_QOS_PDV_IDX] = (float)OSUtils::jsonDouble(qualityWeights["pdv"],0.0); - weights[ZT_QOS_PLR_IDX] = (float)OSUtils::jsonDouble(qualityWeights["plr"],0.0); - weights[ZT_QOS_PER_IDX] = (float)OSUtils::jsonDouble(qualityWeights["per"],0.0); - newTemplateBond->setUserQualityWeights(weights,ZT_QOS_WEIGHT_SIZE); + // Custom link quality spec + json &linkQualitySpec = customPolicy["linkQuality"]; + if (linkQualitySpec.size() == ZT_QOS_PARAMETER_SIZE) { + float weights[ZT_QOS_PARAMETER_SIZE] = {}; + weights[ZT_QOS_LAT_MAX_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["lat_max"],0.0); + weights[ZT_QOS_PDV_MAX_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["pdv_max"],0.0); + weights[ZT_QOS_PLR_MAX_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["plr_max"],0.0); + weights[ZT_QOS_PER_MAX_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["per_max"],0.0); + weights[ZT_QOS_LAT_WEIGHT_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["lat_weight"],0.0); + weights[ZT_QOS_PDV_WEIGHT_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["pdv_weight"],0.0); + weights[ZT_QOS_PLR_WEIGHT_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["plr_weight"],0.0); + weights[ZT_QOS_PER_WEIGHT_IDX] = (float)OSUtils::jsonDouble(linkQualitySpec["per_weight"],0.0); + newTemplateBond->setUserLinkQualitySpec(weights,ZT_QOS_PARAMETER_SIZE); } // Bond-specific properties newTemplateBond->setUpDelay(OSUtils::jsonInt(customPolicy["upDelay"],-1)); @@ -2053,7 +2049,7 @@ public: std::string linkNameStr(linkItr.key()); json &link = linkItr.value(); bool enabled = OSUtils::jsonInt(link["enabled"],true); - uint32_t speed = OSUtils::jsonInt(link["speed"],0); + uint32_t capacity = OSUtils::jsonInt(link["capacity"],0); uint8_t ipvPref = OSUtils::jsonInt(link["ipvPref"],0); std::string failoverToStr(OSUtils::jsonString(link["failoverTo"],"")); // Mode @@ -2071,7 +2067,7 @@ public: failoverToStr = ""; enabled = false; } - _node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,speed,enabled,linkMode,failoverToStr)); + _node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,capacity,enabled,linkMode,failoverToStr)); } std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"optimize")); if (linkSelectMethodStr == "always") {