fix thread-safety issues by adding appropriate locks

This commit is contained in:
Brenton Bostick 2023-04-14 13:42:25 -04:00
parent 156e8db33a
commit ddfdf32b44
4 changed files with 20 additions and 11 deletions

View file

@ -1946,17 +1946,20 @@ void Bond::dumpInfo(int64_t now, bool force)
_lastSummaryDump = now; _lastSummaryDump = now;
float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f); float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f);
_overheadBytes = 0; _overheadBytes = 0;
log("bond: bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)", {
_policy, Mutex::Lock l(_flows_m);
_failoverInterval, log("bond: bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)",
_monitorInterval, _policy,
_upDelay, _failoverInterval,
_downDelay, _monitorInterval,
_flows.size(), _upDelay,
_isLeaf, _downDelay,
overhead, _flows.size(),
_numAliveLinks, _isLeaf,
_numTotalLinks); overhead,
_numAliveLinks,
_numTotalLinks);
}
for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
if (_paths[i].p) { if (_paths[i].p) {
dumpPathStatus(now, i); dumpPathStatus(now, i);

View file

@ -344,6 +344,7 @@ class Bond {
*/ */
static bool inUse() static bool inUse()
{ {
Mutex::Lock l(_bonds_m);
return ! _bondPolicyTemplates.empty() || _defaultPolicy; return ! _bondPolicyTemplates.empty() || _defaultPolicy;
} }

View file

@ -123,6 +123,7 @@ void Peer::received(
} }
// If same address on same interface then don't learn unless existing path isn't alive (prevents learning loop) // If same address on same interface then don't learn unless existing path isn't alive (prevents learning loop)
if (_paths[i].p->address().ipsEqual(path->address()) && _paths[i].p->localSocket() == path->localSocket()) { if (_paths[i].p->address().ipsEqual(path->address()) && _paths[i].p->localSocket() == path->localSocket()) {
Mutex::Lock _l2(_bond_m);
if (_paths[i].p->alive(now) && !_bond) { if (_paths[i].p->alive(now) && !_bond) {
havePath = true; havePath = true;
break; break;
@ -669,6 +670,7 @@ void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t pack
#ifndef ZT_NO_PEER_METRICS #ifndef ZT_NO_PEER_METRICS
_outgoing_packet++; _outgoing_packet++;
#endif #endif
Mutex::Lock l(_bond_m);
if (_localMultipathSupported && _bond) { if (_localMultipathSupported && _bond) {
_bond->recordOutgoingPacket(path, packetId, payloadLength, verb, flowId, now); _bond->recordOutgoingPacket(path, packetId, payloadLength, verb, flowId, now);
} }
@ -679,6 +681,7 @@ void Peer::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
#ifndef ZT_NO_PEER_METRICS #ifndef ZT_NO_PEER_METRICS
_packet_errors++; _packet_errors++;
#endif #endif
Mutex::Lock l(_bond_m);
if (_localMultipathSupported && _bond) { if (_localMultipathSupported && _bond) {
_bond->recordIncomingInvalidPacket(path); _bond->recordIncomingInvalidPacket(path);
} }
@ -687,6 +690,7 @@ void Peer::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
void Peer::recordIncomingPacket(const SharedPtr<Path> &path, const uint64_t packetId, void Peer::recordIncomingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now) uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now)
{ {
Mutex::Lock l(_bond_m);
if (_localMultipathSupported && _bond) { if (_localMultipathSupported && _bond) {
_bond->recordIncomingPacket(path, packetId, payloadLength, verb, flowId, now); _bond->recordIncomingPacket(path, packetId, payloadLength, verb, flowId, now);
} }

View file

@ -521,6 +521,7 @@ public:
* @return The bonding policy used to reach this peer * @return The bonding policy used to reach this peer
*/ */
SharedPtr<Bond> bond() { SharedPtr<Bond> bond() {
Mutex::Lock l(_bond_m);
return _bond; return _bond;
} }