Improve bond tracing, fix bond initialization bugs, remove vestigial debug code

This commit is contained in:
Joseph Henry 2020-08-06 18:10:40 -07:00
parent 9f4985b11a
commit edd960566a
10 changed files with 353 additions and 277 deletions

File diff suppressed because it is too large Load diff

View file

@ -244,7 +244,7 @@ public:
* @param atAddress * @param atAddress
* @param now Current time * @param now Current time
*/ */
void sendACK(void *tPtr,const SharedPtr<Path> &path,int64_t localSocket, void sendACK(void *tPtr, const SharedPtr<Path> &path,int64_t localSocket,
const InetAddress &atAddress,int64_t now); const InetAddress &atAddress,int64_t now);
/** /**
@ -276,9 +276,10 @@ public:
/** /**
* Perform periodic tasks unique to active-backup * Perform periodic tasks unique to active-backup
* *
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param now Current time * @param now Current time
*/ */
void processActiveBackupTasks(int64_t now); void processActiveBackupTasks(void *tPtr, int64_t now);
/** /**
* Switches the active link in an active-backup scenario to the next best during * Switches the active link in an active-backup scenario to the next best during
@ -584,7 +585,6 @@ private:
SharedPtr<Path> _abPath; // current active path SharedPtr<Path> _abPath; // current active path
std::list<SharedPtr<Path> > _abFailoverQueue; std::list<SharedPtr<Path> > _abFailoverQueue;
uint8_t _abLinkSelectMethod; // link re-selection policy for the primary link in active-backup uint8_t _abLinkSelectMethod; // link re-selection policy for the primary link in active-backup
uint64_t _lastActiveBackupPathChange;
// balance-rr // balance-rr
uint8_t _rrIdx; // index to path currently in use during Round Robin operation uint8_t _rrIdx; // index to path currently in use during Round Robin operation
@ -602,7 +602,6 @@ private:
// dynamic link monitoring // dynamic link monitoring
uint8_t _linkMonitorStrategy; uint8_t _linkMonitorStrategy;
uint64_t _lastFrame;
uint32_t _dynamicPathMonitorInterval; uint32_t _dynamicPathMonitorInterval;
// path negotiation // path negotiation
@ -611,29 +610,19 @@ private:
uint8_t _numSentPathNegotiationRequests; uint8_t _numSentPathNegotiationRequests;
unsigned int _pathNegotiationCutoffCount; unsigned int _pathNegotiationCutoffCount;
bool _allowPathNegotiation; bool _allowPathNegotiation;
uint64_t _lastPathNegotiationReceived;
uint64_t _lastSentPathNegotiationRequest;
// timers /**
* Timers and intervals
*/
uint32_t _failoverInterval; uint32_t _failoverInterval;
uint32_t _qosSendInterval; uint32_t _qosSendInterval;
uint32_t _ackSendInterval; uint32_t _ackSendInterval;
uint16_t _ackCutoffCount;
uint64_t _lastAckRateCheck;
uint16_t _qosCutoffCount;
uint64_t _lastQoSRateCheck;
uint32_t throughputMeasurementInterval; uint32_t throughputMeasurementInterval;
uint32_t _qualityEstimationInterval; uint32_t _qualityEstimationInterval;
// timestamps /**
uint64_t _lastCheckUserPreferences; * Acceptable quality thresholds
uint64_t _lastQualityEstimation; */
uint64_t _lastFlowStatReset;
uint64_t _lastFlowExpirationCheck;
uint64_t _lastFlowRebalance;
uint64_t _lastPathNegotiationCheck;
uint64_t _lastBackgroundTaskCheck;
float _maxAcceptablePacketLossRatio; float _maxAcceptablePacketLossRatio;
float _maxAcceptablePacketErrorRatio; float _maxAcceptablePacketErrorRatio;
uint16_t _maxAcceptableLatency; uint16_t _maxAcceptableLatency;
@ -641,6 +630,9 @@ private:
uint16_t _maxAcceptablePacketDelayVariance; uint16_t _maxAcceptablePacketDelayVariance;
uint8_t _minAcceptableAllocation; uint8_t _minAcceptableAllocation;
/**
* Link state reporting
*/
bool _isHealthy; bool _isHealthy;
uint8_t _numAliveLinks; uint8_t _numAliveLinks;
uint8_t _numTotalLinks; uint8_t _numTotalLinks;
@ -666,6 +658,30 @@ private:
*/ */
SharedPtr<Peer> _peer; SharedPtr<Peer> _peer;
/**
* Rate-limit cutoffs
*/
uint16_t _qosCutoffCount;
uint16_t _ackCutoffCount;
/**
* Recent event timestamps
*/
uint64_t _lastAckRateCheck;
uint64_t _lastQoSRateCheck;
uint64_t _lastQualityEstimation;
uint64_t _lastCheckUserPreferences;
uint64_t _lastBackgroundTaskCheck;
uint64_t _lastBondStatusLog;
uint64_t _lastPathNegotiationReceived;
uint64_t _lastPathNegotiationCheck;
uint64_t _lastSentPathNegotiationRequest;
uint64_t _lastFlowStatReset;
uint64_t _lastFlowExpirationCheck;
uint64_t _lastFlowRebalance;
uint64_t _lastFrame;
uint64_t _lastActiveBackupPathChange;
Mutex _paths_m; Mutex _paths_m;
Mutex _flows_m; Mutex _flows_m;

View file

@ -11,6 +11,8 @@
*/ */
/****/ /****/
#include "../osdep/OSUtils.hpp"
#include "Constants.hpp" #include "Constants.hpp"
#include "BondController.hpp" #include "BondController.hpp"
#include "Peer.hpp" #include "Peer.hpp"
@ -51,9 +53,6 @@ void BondController::addCustomLink(std::string& policyAlias, SharedPtr<Link> lin
if (search == _interfaceToLinkMap[policyAlias].end()) { if (search == _interfaceToLinkMap[policyAlias].end()) {
link->setAsUserSpecified(true); link->setAsUserSpecified(true);
_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link>>(link->ifname(), link)); _interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link>>(link->ifname(), link));
} else {
//fprintf(stderr, "link already exists=%s\n", link->ifname().c_str());
// Link is already defined, overlay user settings
} }
} }
@ -82,33 +81,37 @@ SharedPtr<Bond> BondController::createTransportTriggeredBond(const RuntimeEnviro
Mutex::Lock _l(_bonds_m); Mutex::Lock _l(_bonds_m);
int64_t identity = peer->identity().address().toInt(); int64_t identity = peer->identity().address().toInt();
Bond *bond = nullptr; Bond *bond = nullptr;
char traceMsg[128];
if (!_bonds.count(identity)) { if (!_bonds.count(identity)) {
std::string policyAlias; std::string policyAlias;
//fprintf(stderr, "new bond, registering for %llx\n", identity);
if (!_policyTemplateAssignments.count(identity)) { if (!_policyTemplateAssignments.count(identity)) {
if (_defaultBondingPolicy) { if (_defaultBondingPolicy) {
//fprintf(stderr, " no assignment, using default (%d)\n", _defaultBondingPolicy); sprintf(traceMsg, "%s (bond) Creating new default %s bond to peer %llx",
OSUtils::humanReadableTimestamp().c_str(), getPolicyStrByCode(_defaultBondingPolicy).c_str(), identity); RR->t->bondStateMessage(NULL, traceMsg);
bond = new Bond(renv, _defaultBondingPolicy, peer); bond = new Bond(renv, _defaultBondingPolicy, peer);
} }
if (!_defaultBondingPolicy && _defaultBondingPolicyStr.length()) { if (!_defaultBondingPolicy && _defaultBondingPolicyStr.length()) {
//fprintf(stderr, " no assignment, using default custom (%s)\n", _defaultBondingPolicyStr.c_str()); sprintf(traceMsg, "%s (bond) Creating new default custom %s bond to peer %llx",
OSUtils::humanReadableTimestamp().c_str(), _defaultBondingPolicyStr.c_str(), identity);
RR->t->bondStateMessage(NULL, traceMsg);
bond = new Bond(renv, _bondPolicyTemplates[_defaultBondingPolicyStr].ptr(), peer); bond = new Bond(renv, _bondPolicyTemplates[_defaultBondingPolicyStr].ptr(), peer);
} }
} }
else { else {
//fprintf(stderr, " assignment found for %llx, using it as a template (%s)\n", identity,_policyTemplateAssignments[identity].c_str());
if (!_bondPolicyTemplates[_policyTemplateAssignments[identity]]) { if (!_bondPolicyTemplates[_policyTemplateAssignments[identity]]) {
//fprintf(stderr, "unable to locate template (%s), ignoring assignment for (%llx), using defaults\n", _policyTemplateAssignments[identity].c_str(), identity); sprintf(traceMsg, "%s (bond) Creating new bond. Assignment for peer %llx was specified as %s but the bond definition was not found. Using default %s",
OSUtils::humanReadableTimestamp().c_str(), identity, _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultBondingPolicy).c_str());
RR->t->bondStateMessage(NULL, traceMsg);
bond = new Bond(renv, _defaultBondingPolicy, peer); bond = new Bond(renv, _defaultBondingPolicy, peer);
} }
else { else {
sprintf(traceMsg, "%s (bond) Creating new default bond %s to peer %llx",
OSUtils::humanReadableTimestamp().c_str(), _defaultBondingPolicyStr.c_str(), identity);
RR->t->bondStateMessage(NULL, traceMsg);
bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer); bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer);
} }
} }
} }
else {
//fprintf(stderr, "bond already exists for %llx.\n", identity);
}
if (bond) { if (bond) {
_bonds[identity] = bond; _bonds[identity] = bond;
/** /**

View file

@ -376,6 +376,11 @@
*/ */
#define ZT_AQM_DEFAULT_BUCKET 0 #define ZT_AQM_DEFAULT_BUCKET 0
/**
* How often we emit a one-liner bond summary for each peer
*/
#define ZT_MULTIPATH_BOND_STATUS_INTERVAL 30000
/** /**
* How long before we consider a path to be dead in the general sense. This is * How long before we consider a path to be dead in the general sense. This is
* used while searching for default or alternative paths to try in the absence * used while searching for default or alternative paths to try in the absence

View file

@ -91,7 +91,7 @@ void Peer::received(
break; break;
} }
recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, flowId, now); recordIncomingPacket(path, packetId, payloadLength, verb, flowId, now);
if (trustEstablished) { if (trustEstablished) {
_lastTrustEstablishedPacketReceived = now; _lastTrustEstablishedPacketReceived = now;
@ -434,7 +434,7 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now)
} }
} }
void Peer::performMultipathStateCheck(int64_t now) void Peer::performMultipathStateCheck(void *tPtr, int64_t now)
{ {
/** /**
* Check for conditions required for multipath bonding and create a bond * Check for conditions required for multipath bonding and create a bond
@ -481,7 +481,7 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
unsigned int sent = 0; unsigned int sent = 0;
Mutex::Lock _l(_paths_m); Mutex::Lock _l(_paths_m);
performMultipathStateCheck(now); performMultipathStateCheck(tPtr, now);
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
_lastSentFullHello = now; _lastSentFullHello = now;
@ -597,7 +597,7 @@ void Peer::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
_bondToPeer->recordIncomingInvalidPacket(path); _bondToPeer->recordIncomingInvalidPacket(path);
} }
void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, void Peer::recordIncomingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now) uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now)
{ {
if (!_shouldCollectPathStatistics || !_bondToPeer) { if (!_shouldCollectPathStatistics || !_bondToPeer) {

View file

@ -151,7 +151,7 @@ public:
* @param flowId Flow ID * @param flowId Flow ID
* @param now Current time * @param now Current time
*/ */
void recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, void recordIncomingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now); uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now);
/** /**
@ -229,7 +229,7 @@ public:
* possible with this peer. This check should be performed early in the life-cycle of the peer * possible with this peer. This check should be performed early in the life-cycle of the peer
* as well as during the process of learning new paths. * as well as during the process of learning new paths.
*/ */
void performMultipathStateCheck(int64_t now); void performMultipathStateCheck(void *tPtr, int64_t now);
/** /**
* Send pings or keepalives depending on configured timeouts * Send pings or keepalives depending on configured timeouts

View file

@ -94,26 +94,9 @@ void Trace::peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,
} }
} }
void Trace::peerLinkNowRedundant(void *const tPtr,Peer &peer) void Trace::bondStateMessage(void *const tPtr,char *msg)
{ {
//ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is fully redundant",peer.address().toInt()); ZT_LOCAL_TRACE(tPtr,RR,"%s",msg);
}
void Trace::peerLinkNoLongerRedundant(void *const tPtr,Peer &peer)
{
//ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is no longer redundant",peer.address().toInt());
}
void Trace::peerLinkAggregateStatistics(void *const tPtr,Peer &peer)
{
/*
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is composed of (%d) physical paths %s, has packet delay variance (%.0f ms), mean latency (%.0f ms)",
peer.address().toInt(),
peer.aggregateLinkPhysicalPathCount(),
peer.interfaceListStr(),
peer.computeAggregateLinkPacketDelayVariance(),
peer.computeAggregateLinkMeanLatency());
*/
} }
void Trace::peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId) void Trace::peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId)

View file

@ -109,10 +109,7 @@ public:
void peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &path,const uint64_t packetId,const Packet::Verb verb); void peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &path,const uint64_t packetId,const Packet::Verb verb);
void peerLinkNowRedundant(void *const tPtr,Peer &peer); void bondStateMessage(void *const tPtr,char *msg);
void peerLinkNoLongerRedundant(void *const tPtr,Peer &peer);
void peerLinkAggregateStatistics(void *const tPtr,Peer &peer);
void peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId); void peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId);
void peerRedirected(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath); void peerRedirected(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);

View file

@ -492,6 +492,7 @@ static int cli(int argc,char **argv)
if (json) { if (json) {
printf("%s" ZT_EOL_S,OSUtils::jsonDump(j).c_str()); printf("%s" ZT_EOL_S,OSUtils::jsonDump(j).c_str());
} else { } else {
bool bFoundBond = false;
printf(" <peer> <bondtype> <status> <links>" ZT_EOL_S); printf(" <peer> <bondtype> <status> <links>" ZT_EOL_S);
if (j.is_array()) { if (j.is_array()) {
for(unsigned long k=0;k<j.size();++k) { for(unsigned long k=0;k<j.size();++k) {
@ -504,6 +505,7 @@ static int cli(int argc,char **argv)
int8_t numTotalLinks = p["numTotalLinks"]; int8_t numTotalLinks = p["numTotalLinks"];
if (isBonded) { if (isBonded) {
bFoundBond = true;
std::string healthStr; std::string healthStr;
if (isHealthy) { if (isHealthy) {
healthStr = "HEALTHY"; healthStr = "HEALTHY";
@ -524,6 +526,9 @@ static int cli(int argc,char **argv)
} }
} }
} }
if (!bFoundBond) {
printf(" NONE\t\t\t\tNONE\t NONE NONE" ZT_EOL_S);
}
} }
return 0; return 0;
} else { } else {

View file

@ -195,6 +195,20 @@ public:
*/ */
static std::vector<InetAddress> resolve(const char *name); static std::vector<InetAddress> resolve(const char *name);
/**
* @return Current time in a human-readable format
*/
static inline std::string humanReadableTimestamp()
{
time_t rawtime;
struct tm * timeinfo;
char buffer [80];
time (&rawtime);
timeinfo = localtime (&rawtime);
strftime (buffer,80,"%F %T",timeinfo);
return std::string(buffer);
}
/** /**
* @return Current time in milliseconds since epoch * @return Current time in milliseconds since epoch
*/ */