Fix timers, fix flow count discrepancy after flow removal, fix balance-aware flow re-assignment when one or more links go down

This commit is contained in:
Joseph Henry 2020-06-01 22:58:58 -07:00
parent 1dca7b92cf
commit fa5c8ef434
3 changed files with 117 additions and 109 deletions

View file

@ -25,7 +25,7 @@ Bond::Bond(const RuntimeEnvironment *renv, int policy, const SharedPtr<Peer>& pe
RR(renv), RR(renv),
_peer(peer) _peer(peer)
{ {
setReasonableDefaults(policy); setReasonableDefaults(policy, SharedPtr<Bond>(), false);
_policyAlias = BondController::getPolicyStrByCode(policy); _policyAlias = BondController::getPolicyStrByCode(policy);
} }
@ -34,31 +34,14 @@ Bond::Bond(const RuntimeEnvironment *renv, std::string& basePolicy, std::string&
_policyAlias(policyAlias), _policyAlias(policyAlias),
_peer(peer) _peer(peer)
{ {
setReasonableDefaults(BondController::getPolicyCodeByStr(basePolicy)); setReasonableDefaults(BondController::getPolicyCodeByStr(basePolicy), SharedPtr<Bond>(), false);
} }
Bond::Bond(const RuntimeEnvironment *renv, const Bond &originalBond, const SharedPtr<Peer>& peer) : Bond::Bond(const RuntimeEnvironment *renv, SharedPtr<Bond> originalBond, const SharedPtr<Peer>& peer) :
RR(renv), RR(renv),
_peer(peer) _peer(peer)
{ {
// First, set everything to sane defaults setReasonableDefaults(originalBond->_bondingPolicy, originalBond, true);
setReasonableDefaults(originalBond._bondingPolicy);
_policyAlias = originalBond._policyAlias;
// Second, apply user specified values (only if they make sense)
_downDelay = originalBond._downDelay;
_upDelay = originalBond._upDelay;
if (originalBond._bondMonitorInterval > 0 && originalBond._bondMonitorInterval < 65535) {
_bondMonitorInterval = originalBond._bondMonitorInterval;
}
else {
fprintf(stderr, "warning: bondMonitorInterval (%d) is out of range, using default (%d)\n", originalBond._bondMonitorInterval, _bondMonitorInterval);
}
if (originalBond._slaveMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_PASSIVE
&& originalBond._failoverInterval != 0) {
fprintf(stderr, "warning: passive path monitoring was specified, this will prevent failovers from happening in a timely manner.\n");
}
_abSlaveSelectMethod = originalBond._abSlaveSelectMethod;
memcpy(_qualityWeights, originalBond._qualityWeights, ZT_QOS_WEIGHT_SIZE * sizeof(float));
} }
void Bond::nominatePath(const SharedPtr<Path>& path, int64_t now) void Bond::nominatePath(const SharedPtr<Path>& path, int64_t now)
@ -97,7 +80,7 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
/** /**
* active-backup * active-backup
*/ */
if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP) { if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
if (_abPath) { if (_abPath) {
return _abPath; return _abPath;
} }
@ -105,7 +88,7 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
/** /**
* broadcast * broadcast
*/ */
if (_bondingPolicy== ZT_BONDING_POLICY_BROADCAST) { if (_bondingPolicy == ZT_BONDING_POLICY_BROADCAST) {
return SharedPtr<Path>(); // Handled in Switch::_trySend() return SharedPtr<Path>(); // Handled in Switch::_trySend()
} }
if (!_numBondedPaths) { if (!_numBondedPaths) {
@ -114,7 +97,7 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
/** /**
* balance-rr * balance-rr
*/ */
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
if (!_allowFlowHashing) { if (!_allowFlowHashing) {
//fprintf(stderr, "_rrPacketsSentOnCurrSlave=%d, _numBondedPaths=%d, _rrIdx=%d\n", _rrPacketsSentOnCurrSlave, _numBondedPaths, _rrIdx); //fprintf(stderr, "_rrPacketsSentOnCurrSlave=%d, _numBondedPaths=%d, _rrIdx=%d\n", _rrPacketsSentOnCurrSlave, _numBondedPaths, _rrIdx);
if (_packetsPerSlave == 0) { if (_packetsPerSlave == 0) {
@ -151,7 +134,7 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
/** /**
* balance-xor * balance-xor
*/ */
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
if (!_allowFlowHashing || flowId == -1) { if (!_allowFlowHashing || flowId == -1) {
// No specific path required for unclassified traffic, send on anything // No specific path required for unclassified traffic, send on anything
return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize
@ -252,9 +235,9 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
* which path to use. * which path to use.
*/ */
if ((flowId != ZT_QOS_NO_FLOW) if ((flowId != ZT_QOS_NO_FLOW)
&& (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR && (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR
|| _bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR
|| _bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE)) { || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) {
Mutex::Lock _l(_flows_m); Mutex::Lock _l(_flows_m);
SharedPtr<Flow> flow; SharedPtr<Flow> flow;
if (!_flows.count(flowId)) { if (!_flows.count(flowId)) {
@ -335,6 +318,7 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow> &flow, int64_t now)
unsigned int idx = ZT_MAX_PEER_NETWORK_PATHS; unsigned int idx = ZT_MAX_PEER_NETWORK_PATHS;
if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
idx = abs((int)(flow->id() % (_numBondedPaths))); idx = abs((int)(flow->id() % (_numBondedPaths)));
//fprintf(stderr, "flow->id()=%d, %x, _numBondedPaths=%d, idx=%d\n", flow->id(), flow->id(), _numBondedPaths, idx);
flow->assignPath(_paths[_bondedIdx[idx]],now); flow->assignPath(_paths[_bondedIdx[idx]],now);
} }
if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
@ -347,15 +331,28 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow> &flow, int64_t now)
fprintf(stderr, "no bonded paths for flow assignment\n"); fprintf(stderr, "no bonded paths for flow assignment\n");
return false; return false;
} }
/* Since there may be scenarios where a path is removed before we can re-estimate
relative qualities (and thus allocations) we need to down-modulate the entropy
value that we use to randomly assign among the surviving paths, otherwise we risk
not being able to find a path to assign this flow to. */
int totalIncompleteAllocation = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i] && _paths[i]->bonded()) {
totalIncompleteAllocation += _paths[i]->_allocation;
}
}
fprintf(stderr, "entropy = %d, totalIncompleteAllocation=%d\n", entropy, totalIncompleteAllocation);
entropy %= totalIncompleteAllocation;
fprintf(stderr, "new entropy = %d\n", entropy);
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i] && _paths[i]->bonded()) { if (_paths[i] && _paths[i]->bonded()) {
SharedPtr<Slave> slave = RR->bc->getSlaveBySocket(_policyAlias, _paths[i]->localSocket()); SharedPtr<Slave> slave = RR->bc->getSlaveBySocket(_policyAlias, _paths[i]->localSocket());
_paths[i]->address().toString(curPathStr); _paths[i]->address().toString(curPathStr);
uint8_t probabilitySegment = (_totalBondUnderload > 0) ? _paths[i]->_affinity : _paths[i]->_allocation; uint8_t probabilitySegment = (_totalBondUnderload > 0) ? _paths[i]->_affinity : _paths[i]->_allocation;
//fprintf(stderr, "i=%2d, entropy=%3d, alloc=%3d, byteload=%4d, segment=%3d, _totalBondUnderload=%3d, ifname=%s, path=%20s\n", i, entropy, _paths[i]->allocation, _paths[i]->relativeByteLoad, probabilitySegment, _totalBondUnderload, slave->ifname().c_str(), curPathStr); fprintf(stderr, "i=%2d, entropy=%3d, alloc=%3d, byteload=%4d, segment=%3d, _totalBondUnderload=%3d, ifname=%s, path=%20s\n", i, entropy, _paths[i]->_allocation, _paths[i]->_relativeByteLoad, probabilitySegment, _totalBondUnderload, slave->ifname().c_str(), curPathStr);
if (entropy <= probabilitySegment) { if (entropy <= probabilitySegment) {
idx = i; idx = i;
//fprintf(stderr, "\t is best path\n"); fprintf(stderr, "\t is best path\n");
break; break;
} }
entropy -= probabilitySegment; entropy -= probabilitySegment;
@ -423,6 +420,7 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
while (it != _flows.end()) { while (it != _flows.end()) {
if (it->second->age(now) > age) { if (it->second->age(now) > age) {
fprintf(stderr, "forgetting flow %x between this node and %llx, %lu active flow(s)\n", it->first, _peer->_id.address().toInt(), (_flows.size()-1)); fprintf(stderr, "forgetting flow %x between this node and %llx, %lu active flow(s)\n", it->first, _peer->_id.address().toInt(), (_flows.size()-1));
it->second->assignedPath()->_assignedFlowCount--;
it = _flows.erase(it); it = _flows.erase(it);
} else { } else {
++it; ++it;
@ -440,10 +438,10 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
} }
if (oldestFlow != _flows.end()) { if (oldestFlow != _flows.end()) {
fprintf(stderr, "forgetting oldest flow %x (of age %llu) between this node and %llx, %lu active flow(s)\n", oldestFlow->first, oldestFlow->second->age(now), _peer->_id.address().toInt(), (_flows.size()-1)); fprintf(stderr, "forgetting oldest flow %x (of age %llu) between this node and %llx, %lu active flow(s)\n", oldestFlow->first, oldestFlow->second->age(now), _peer->_id.address().toInt(), (_flows.size()-1));
oldestFlow->second->assignedPath()->_assignedFlowCount--;
_flows.erase(oldestFlow); _flows.erase(oldestFlow);
} }
} }
fprintf(stderr, "000\n");
} }
void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path> &path, int16_t remoteUtility) void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path> &path, int16_t remoteUtility)
@ -610,17 +608,17 @@ void Bond::processBackgroundTasks(void *tPtr, const int64_t now)
//fprintf(stderr, "_lastFrame=%llu, suggestedMonitorInterval=%d, _dynamicPathMonitorInterval=%d\n", //fprintf(stderr, "_lastFrame=%llu, suggestedMonitorInterval=%d, _dynamicPathMonitorInterval=%d\n",
// (now-_lastFrame), suggestedMonitorInterval, _dynamicPathMonitorInterval); // (now-_lastFrame), suggestedMonitorInterval, _dynamicPathMonitorInterval);
} }
// TODO: Clarify and generalize this logic
if (_slaveMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) { if (_slaveMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) {
_shouldCollectPathStatistics = true; _shouldCollectPathStatistics = true;
} }
// Memoize oft-used properties in the packet ingress/egress logic path // Memoize oft-used properties in the packet ingress/egress logic path
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
// Required for real-time balancing // Required for real-time balancing
_shouldCollectPathStatistics = true; _shouldCollectPathStatistics = true;
} }
if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP) { if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
if (_abSlaveSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_BETTER) { if (_abSlaveSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_BETTER) {
// Required for judging suitability of primary slave after recovery // Required for judging suitability of primary slave after recovery
_shouldCollectPathStatistics = true; _shouldCollectPathStatistics = true;
@ -680,7 +678,7 @@ void Bond::processBackgroundTasks(void *tPtr, const int64_t now)
void Bond::applyUserPrefs() void Bond::applyUserPrefs()
{ {
fprintf(stderr, "applyUserPrefs, _minReqPathMonitorInterval=%d\n", RR->bc->minReqPathMonitorInterval()); //fprintf(stderr, "applyUserPrefs, _minReqPathMonitorInterval=%d\n", RR->bc->minReqPathMonitorInterval());
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (!_paths[i]) { if (!_paths[i]) {
continue; continue;
@ -717,7 +715,7 @@ void Bond::applyUserPrefs()
void Bond::curateBond(const int64_t now, bool rebuildBond) void Bond::curateBond(const int64_t now, bool rebuildBond)
{ {
//fprintf(stderr, "%lu curateBond (rebuildBond=%d)\n", ((now - RR->bc->getBondStartTime())), rebuildBond); //fprintf(stderr, "%lu curateBond (rebuildBond=%d), _numBondedPaths=%d\n", ((now - RR->bc->getBondStartTime())), rebuildBond, _numBondedPaths);
char pathStr[128]; char pathStr[128];
/** /**
* Update path states * Update path states
@ -727,6 +725,9 @@ void Bond::curateBond(const int64_t now, bool rebuildBond)
continue; continue;
} }
bool currEligibility = _paths[i]->eligible(now,_ackSendInterval); bool currEligibility = _paths[i]->eligible(now,_ackSendInterval);
//_paths[i]->address().toString(pathStr);
//fprintf(stderr, "\n\n%ld path eligibility (for %s, %s):\n", (RR->node->now() - RR->bc->getBondStartTime()), getSlave(_paths[i])->ifname().c_str(), pathStr);
//_paths[i]->printEligible(now,_ackSendInterval);
if (currEligibility != _paths[i]->_lastEligibilityState) { if (currEligibility != _paths[i]->_lastEligibilityState) {
_paths[i]->address().toString(pathStr); _paths[i]->address().toString(pathStr);
//fprintf(stderr, "\n\n%ld path eligibility (for %s, %s) has changed (from %d to %d)\n", (RR->node->now() - RR->bc->getBondStartTime()), getSlave(_paths[i])->ifname().c_str(), pathStr, _paths[i]->lastCheckedEligibility, _paths[i]->eligible(now,_ackSendInterval)); //fprintf(stderr, "\n\n%ld path eligibility (for %s, %s) has changed (from %d to %d)\n", (RR->node->now() - RR->bc->getBondStartTime()), getSlave(_paths[i])->ifname().c_str(), pathStr, _paths[i]->lastCheckedEligibility, _paths[i]->eligible(now,_ackSendInterval));
@ -754,9 +755,9 @@ void Bond::curateBond(const int64_t now, bool rebuildBond)
* Curate the set of paths that are part of the bond proper. Selects a single path * Curate the set of paths that are part of the bond proper. Selects a single path
* per logical slave according to eligibility and user-specified constraints. * per logical slave according to eligibility and user-specified constraints.
*/ */
if ((_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR) if ((_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR)
|| (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR) || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR)
|| (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE)) { || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) {
if (!_numBondedPaths) { if (!_numBondedPaths) {
rebuildBond = true; rebuildBond = true;
} }
@ -822,7 +823,7 @@ void Bond::curateBond(const int64_t now, bool rebuildBond)
} }
_numBondedPaths = updatedBondedPathCount; _numBondedPaths = updatedBondedPathCount;
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
// Cause a RR reset since the currently used index might no longer be valid // Cause a RR reset since the currently used index might no longer be valid
_rrPacketsSentOnCurrSlave = _packetsPerSlave; _rrPacketsSentOnCurrSlave = _packetsPerSlave;
} }
@ -975,11 +976,9 @@ void Bond::estimatePathQuality(const int64_t now)
_paths[i]->_allocation = alloc[i]; _paths[i]->_allocation = alloc[i];
} }
} }
/*
if ((now - _lastLogTS) > 500) { if ((now - _lastLogTS) > 500) {
if (!relevant()) {return;} if (!relevant()) {return;}
//fprintf(stderr, "\n"); //fprintf(stderr, "\n");
_lastPrintTS = now;
_lastLogTS = now; _lastLogTS = now;
int numPlottablePaths=0; int numPlottablePaths=0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
@ -1013,13 +1012,12 @@ void Bond::estimatePathQuality(const int64_t now)
if (_paths[i]) { if (_paths[i]) {
_paths[i]->address().toString(pathStr); _paths[i]->address().toString(pathStr);
fprintf(stdout, "%s, %s, %8.3f, %8.3f, %8.3f, %5.3f, %5.3f, %5.3f, %8f, %5.3f, %5.3f, %d, %5.3f, %d, %d, %d, %d, %d, %d, ", fprintf(stdout, "%s, %s, %8.3f, %8.3f, %8.3f, %5.3f, %5.3f, %5.3f, %8f, %5.3f, %5.3f, %d, %5.3f, %d, %d, %d, %d, %d, %d, ",
getSlave(_paths[i])->ifname().c_str(), pathStr, _paths[i]->latencyMean, lat[i],pdv[i], _paths[i]->packetLossRatio, plr[i],per[i],thr[i],thm[i],thv[i],(now - _paths[i]->lastIn()),quality[i],alloc[i], getSlave(_paths[i])->ifname().c_str(), pathStr, _paths[i]->_latencyMean, lat[i],pdv[i], _paths[i]->_packetLossRatio, plr[i],per[i],thr[i],thm[i],thv[i],(now - _paths[i]->lastIn()),quality[i],alloc[i],
_paths[i]->relativeByteLoad, _paths[i]->assignedFlowCount, _paths[i]->alive(now, true), _paths[i]->eligible(now,_ackSendInterval), _paths[i]->qosStatsOut.size()); _paths[i]->_relativeByteLoad, _paths[i]->_assignedFlowCount, _paths[i]->alive(now, true), _paths[i]->eligible(now,_ackSendInterval), _paths[i]->qosStatsOut.size());
} }
} }
fprintf(stdout, "\n"); fprintf(stdout, "\n");
} }
*/
} }
void Bond::processBalanceTasks(const int64_t now) void Bond::processBalanceTasks(const int64_t now)
@ -1047,7 +1045,7 @@ void Bond::processBalanceTasks(const int64_t now)
/** /**
* Re-allocate flows from dead paths * Re-allocate flows from dead paths
*/ */
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
Mutex::Lock _l(_flows_m); Mutex::Lock _l(_flows_m);
for (int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { for (int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (!_paths[i]) { if (!_paths[i]) {
@ -1073,7 +1071,7 @@ void Bond::processBalanceTasks(const int64_t now)
/** /**
* Tasks specific to (Balance Round Robin) * Tasks specific to (Balance Round Robin)
*/ */
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
if (_allowFlowHashing) { if (_allowFlowHashing) {
// TODO: Should ideally failover from (idx) to a random slave, this is so that (idx+1) isn't overloaded // TODO: Should ideally failover from (idx) to a random slave, this is so that (idx+1) isn't overloaded
} }
@ -1084,13 +1082,13 @@ void Bond::processBalanceTasks(const int64_t now)
/** /**
* Tasks specific to (Balance XOR) * Tasks specific to (Balance XOR)
*/ */
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
// Nothing specific for XOR // Nothing specific for XOR
} }
/** /**
* Tasks specific to (Balance Aware) * Tasks specific to (Balance Aware)
*/ */
if ((_bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE)) { if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
if (_allowFlowHashing) { if (_allowFlowHashing) {
Mutex::Lock _l(_flows_m); Mutex::Lock _l(_flows_m);
/** /**
@ -1118,8 +1116,9 @@ void Bond::processBalanceTasks(const int64_t now)
* Determine "affinity" for bonded path * Determine "affinity" for bonded path
*/ */
//fprintf(stderr, "\n\n"); //fprintf(stderr, "\n\n");
_totalBondUnderload = 0;
_totalBondUnderload = 0;
/*
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { // second pass: compute relative byte loads and total imbalance for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { // second pass: compute relative byte loads and total imbalance
if (_paths[i] && _paths[i]->bonded()) { if (_paths[i] && _paths[i]->bonded()) {
if (totalBytes) { if (totalBytes) {
@ -1139,7 +1138,7 @@ void Bond::processBalanceTasks(const int64_t now)
} }
} }
} }
*/
//fprintf(stderr, "_totalBondUnderload=%d (end)\n\n", _totalBondUnderload); //fprintf(stderr, "_totalBondUnderload=%d (end)\n\n", _totalBondUnderload);
/** /**
@ -1502,7 +1501,7 @@ void Bond::processActiveBackupTasks(const int64_t now)
} }
} }
void Bond::setReasonableDefaults(int policy) void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool useTemplate)
{ {
// If invalid bonding policy, try default // If invalid bonding policy, try default
int _defaultBondingPolicy = BondController::defaultBondingPolicy(); int _defaultBondingPolicy = BondController::defaultBondingPolicy();
@ -1548,7 +1547,10 @@ void Bond::setReasonableDefaults(int policy)
_lastFrame=0; _lastFrame=0;
// TODO: Remove
_header=false;
_lastLogTS = RR->node->now();
_lastPrintTS = RR->node->now();
/** /**
* Paths are actively monitored to provide a real-time quality/preference-ordered rapid failover queue. * Paths are actively monitored to provide a real-time quality/preference-ordered rapid failover queue.
@ -1635,18 +1637,53 @@ void Bond::setReasonableDefaults(int policy)
break; break;
} }
if (useTemplate) {
_policyAlias = templateBond->_policyAlias;
_failoverInterval = templateBond->_failoverInterval;
_downDelay = templateBond->_downDelay;
_upDelay = templateBond->_upDelay;
fprintf(stderr, "TIMERS: strat=%d, fi= %d, bmi= %d, qos= %d, ack= %d, estimateInt= %d, refractory= %d, ud= %d, dd= %d\n",
_slaveMonitorStrategy,
_failoverInterval,
_bondMonitorInterval,
_qosSendInterval,
_ackSendInterval,
_qualityEstimationInterval,
_defaultPathRefractoryPeriod,
_upDelay,
_downDelay);
if (templateBond->_slaveMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_PASSIVE
&& templateBond->_failoverInterval != 0) {
fprintf(stderr, "warning: passive path monitoring was specified, this will prevent failovers from happening in a timely manner.\n");
}
_abSlaveSelectMethod = templateBond->_abSlaveSelectMethod;
memcpy(_qualityWeights, templateBond->_qualityWeights, ZT_QOS_WEIGHT_SIZE * sizeof(float));
}
//
// Second, apply user specified values (only if they make sense)
/** /**
* Timer geometries and counters * Timer geometries and counters
*/ */
// TODO: Think more about the maximum
/*
if (originalBond._failoverInterval > 250 && originalBond._failoverInterval < 65535) {
_failoverInterval = originalBond._failoverInterval;
}
else {
fprintf(stderr, "warning: _failoverInterval (%d) is out of range, using default (%d)\n", originalBond._failoverInterval, _failoverInterval);
}
*/
_bondMonitorInterval = _failoverInterval / 3; _bondMonitorInterval = _failoverInterval / 3;
BondController::setMinReqPathMonitorInterval(_bondMonitorInterval);
_ackSendInterval = _failoverInterval; _ackSendInterval = _failoverInterval;
_qualityEstimationInterval = _failoverInterval * 2; _qualityEstimationInterval = _failoverInterval * 2;
_dynamicPathMonitorInterval = 0; _dynamicPathMonitorInterval = 0;
_downDelay=0;
_upDelay=0;
_ackCutoffCount = 0; _ackCutoffCount = 0;
_lastAckRateCheck = 0; _lastAckRateCheck = 0;
_qosSendInterval = _bondMonitorInterval * 4; _qosSendInterval = _bondMonitorInterval * 4;
@ -1654,33 +1691,7 @@ void Bond::setReasonableDefaults(int policy)
_lastQoSRateCheck = 0; _lastQoSRateCheck = 0;
_lastQualityEstimation=0; _lastQualityEstimation=0;
throughputMeasurementInterval = _ackSendInterval * 2; throughputMeasurementInterval = _ackSendInterval * 2;
BondController::setMinReqPathMonitorInterval(_bondMonitorInterval);
_defaultPathRefractoryPeriod = 8000; _defaultPathRefractoryPeriod = 8000;
// TODO: Remove
_header=false;
_lastLogTS = 0;
_lastPrintTS = 0;
fprintf(stderr, "TIMERS: strat=%d, fi= %d, bmi= %d, qos= %d, ack= %d, estimateInt= %d, refractory= %d, ud= %d, dd= %d\n",
_slaveMonitorStrategy,
_failoverInterval,
_bondMonitorInterval,
_qosSendInterval,
_ackSendInterval,
_qualityEstimationInterval,
_defaultPathRefractoryPeriod,
_upDelay,
_downDelay);
} }
void Bond::setUserQualityWeights(float weights[], int len) void Bond::setUserQualityWeights(float weights[], int len)
@ -1721,22 +1732,20 @@ void Bond::dumpInfo(const int64_t now)
fprintf(stderr, "---[ bp=%d, id=%llx, dd=%d, up=%d, pmi=%d, specifiedSlaves=%d, _specifiedPrimarySlave=%d, _specifiedFailInst=%d ]\n", fprintf(stderr, "---[ bp=%d, id=%llx, dd=%d, up=%d, pmi=%d, specifiedSlaves=%d, _specifiedPrimarySlave=%d, _specifiedFailInst=%d ]\n",
_policy, _peer->identity().address().toInt(), _downDelay, _upDelay, _monitorInterval, _userHasSpecifiedSlaves, _userHasSpecifiedPrimarySlave, _userHasSpecifiedFailoverInstructions); _policy, _peer->identity().address().toInt(), _downDelay, _upDelay, _monitorInterval, _userHasSpecifiedSlaves, _userHasSpecifiedPrimarySlave, _userHasSpecifiedFailoverInstructions);
if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP) { if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
fprintf(stderr, "Paths (bp=%d, stats=%d, primaryReselect=%d) :\n", fprintf(stderr, "Paths (bp=%d, stats=%d, primaryReselect=%d) :\n",
_policy, _shouldCollectPathStatistics, _abSlaveSelectMethod); _policy, _shouldCollectPathStatistics, _abSlaveSelectMethod);
} }
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR
|| _bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR
|| _bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE) { || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
fprintf(stderr, "Paths (bp=%d, stats=%d, fh=%d) :\n", fprintf(stderr, "Paths (bp=%d, stats=%d, fh=%d) :\n",
_policy, _shouldCollectPathStatistics, _allowFlowHashing); _policy, _shouldCollectPathStatistics, _allowFlowHashing);
}*/ }*/
if ((now - _lastPrintTS) < 1000) {
if ((now - _lastLogTS) < 1000) {
return; return;
} }
_lastPrintTS = now; _lastPrintTS = now;
_lastLogTS = now;
fprintf(stderr, "\n\n"); fprintf(stderr, "\n\n");
@ -1792,21 +1801,21 @@ void Bond::dumpInfo(const int64_t now)
} else { } else {
fprintf(stderr, " "); fprintf(stderr, " ");
} }
if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP && _abPath && (_abPath == _paths[i].ptr())) { if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP && _abPath && (_abPath == _paths[i].ptr())) {
fprintf(stderr, " ACTIVE "); fprintf(stderr, " ACTIVE ");
} else if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP) { } else if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
fprintf(stderr, " "); fprintf(stderr, " ");
} }
if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP && _abFailoverQueue.size() && (_abFailoverQueue.front().ptr() == _paths[i].ptr())) { if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP && _abFailoverQueue.size() && (_abFailoverQueue.front().ptr() == _paths[i].ptr())) {
fprintf(stderr, " NEXT "); fprintf(stderr, " NEXT ");
} else if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP) { } else if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
fprintf(stderr, " "); fprintf(stderr, " ");
} }
fprintf(stderr, "%5s %s\n", slave->ifname().c_str(), pathStr); fprintf(stderr, "%5s %s\n", slave->ifname().c_str(), pathStr);
} }
} }
if (_bondingPolicy== ZT_BONDING_POLICY_ACTIVE_BACKUP) { if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
if (!_abFailoverQueue.empty()) { if (!_abFailoverQueue.empty()) {
fprintf(stderr, "\nFailover Queue:\n"); fprintf(stderr, "\nFailover Queue:\n");
for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it!=_abFailoverQueue.end();++it) { for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it!=_abFailoverQueue.end();++it) {
@ -1827,28 +1836,26 @@ void Bond::dumpInfo(const int64_t now)
} }
} }
if (_bondingPolicy== ZT_BONDING_POLICY_BALANCE_RR if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR
|| _bondingPolicy== ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR
|| _bondingPolicy== ZT_BONDING_POLICY_BALANCE_AWARE) { || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
/*
if (_numBondedPaths) { if (_numBondedPaths) {
fprintf(stderr, "\nBonded Paths:\n"); fprintf(stderr, "\nBonded Paths:\n");
for (int i=0; i<_numBondedPaths; ++i) { for (int i=0; i<_numBondedPaths; ++i) {
_paths[_bondedIdx[i]].p->address().toString(currPathStr); _paths[_bondedIdx[i]]->address().toString(currPathStr);
SharedPtr<Slave> slave =RR->bc->getSlaveBySocket(_policyAlias, _paths[_bondedIdx[i]].p->localSocket()); SharedPtr<Slave> slave =RR->bc->getSlaveBySocket(_policyAlias, _paths[_bondedIdx[i]]->localSocket());
fprintf(stderr, " [%d]\t%8s\tflows=%3d\tspeed=%7d\trelSpeed=%3d\tipvPref=%3d\tfscore=%9d\t\t%s\n", i, fprintf(stderr, " [%d]\t%8s\tflows=%3d\tspeed=%7d\trelSpeed=%3d\tipvPref=%3d\tfscore=%9d\t\t%s\n", i,
//fprintf(stderr, " [%d]\t%8s\tspeed=%7d\trelSpeed=%3d\tflowCount=%2d\tipvPref=%3d\tfscore=%9d\t\t%s\n", i, //fprintf(stderr, " [%d]\t%8s\tspeed=%7d\trelSpeed=%3d\tflowCount=%2d\tipvPref=%3d\tfscore=%9d\t\t%s\n", i,
slave->ifname().c_str(), slave->ifname().c_str(),
numberOfAssignedFlows(_paths[_bondedIdx[i]].p), _paths[_bondedIdx[i]]->_assignedFlowCount,
slave->speed(), slave->speed(),
slave->relativeSpeed(), slave->relativeSpeed(),
//_paths[_bondedIdx[i]].p->assignedFlows.size(), //_paths[_bondedIdx[i]].p->assignedFlows.size(),
slave->ipvPref(), slave->ipvPref(),
_paths[_bondedIdx[i]].p->failoverScore(), _paths[_bondedIdx[i]]->_failoverScore,
currPathStr); currPathStr);
} }
} }
*/
/* /*
if (_allowFlowHashing) { if (_allowFlowHashing) {
//Mutex::Lock _l(_flows_m); //Mutex::Lock _l(_flows_m);

View file

@ -79,7 +79,7 @@ public:
* @param original * @param original
* @param peer * @param peer
*/ */
Bond(const RuntimeEnvironment *renv, const Bond &original, const SharedPtr<Peer>& peer); Bond(const RuntimeEnvironment *renv, SharedPtr<Bond> originalBond, const SharedPtr<Peer>& peer);
/** /**
* @return The human-readable name of the bonding policy * @return The human-readable name of the bonding policy
@ -293,8 +293,9 @@ public:
* user-specified parameters. * user-specified parameters.
* *
* @param policy Bonding policy * @param policy Bonding policy
* @param templateBond
*/ */
void setReasonableDefaults(int policy); void setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool useTemplate);
/** /**
* Check and assign user-specified quality weights to this bond. * Check and assign user-specified quality weights to this bond.

View file

@ -92,7 +92,7 @@ SharedPtr<Bond> BondController::createTransportTriggeredBond(const RuntimeEnviro
} }
if (!_defaultBondingPolicy && _defaultBondingPolicyStr.length()) { if (!_defaultBondingPolicy && _defaultBondingPolicyStr.length()) {
fprintf(stderr, " no assignment, using default custom (%s)\n", _defaultBondingPolicyStr.c_str()); fprintf(stderr, " no assignment, using default custom (%s)\n", _defaultBondingPolicyStr.c_str());
bond = new Bond(renv, *(_bondPolicyTemplates[_defaultBondingPolicyStr].ptr()), peer); bond = new Bond(renv, _bondPolicyTemplates[_defaultBondingPolicyStr].ptr(), peer);
} }
} }
else { else {
@ -102,7 +102,7 @@ SharedPtr<Bond> BondController::createTransportTriggeredBond(const RuntimeEnviro
bond = new Bond(renv, _defaultBondingPolicy, peer); bond = new Bond(renv, _defaultBondingPolicy, peer);
} }
else { else {
bond = new Bond(renv, *(_bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr()), peer); bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer);
} }
} }
} }