A bunch more cleanup and build fixes, and remove old version of multipath code to prep for new version.

This commit is contained in:
Adam Ierymenko 2020-01-10 20:40:14 -08:00
parent 790fe50fac
commit 7291ac2093
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
17 changed files with 454 additions and 2509 deletions

View file

@ -321,35 +321,6 @@ enum ZT_ResultCode
*/
#define ZT_ResultCode_isFatal(x) ((((int)(x)) >= 100)&&(((int)(x)) < 1000))
/**
* The multipath algorithm in use by this node.
*/
enum ZT_MultipathMode
{
/**
* No active multipath.
*
* Traffic is merely sent over the strongest path. That being
* said, this mode will automatically failover in the event that a link goes down.
*/
ZT_MULTIPATH_NONE = 0,
/**
* Traffic is randomly distributed among all active paths.
*
* Will cease sending traffic over links that appear to be stale.
*/
ZT_MULTIPATH_RANDOM = 1,
/**
* Traffic is allocated across all active paths in proportion to their strength and
* reliability.
*
* Will cease sending traffic over links that appear to be stale.
*/
ZT_MULTIPATH_PROPORTIONALLY_BALANCED = 2,
};
/**
* Status codes sent to status update callback when things happen
*/
@ -1012,56 +983,6 @@ typedef struct
*/
uint64_t trustedPathId;
/**
* One-way latency
*/
float latency;
/**
* How much latency varies over time
*/
float packetDelayVariance;
/**
* How much observed throughput varies over time
*/
float throughputDisturbCoeff;
/**
* Packet Error Ratio (PER)
*/
float packetErrorRatio;
/**
* Packet Loss Ratio (PLR)
*/
float packetLossRatio;
/**
* Stability of the path
*/
float stability;
/**
* Current throughput (moving average)
*/
uint64_t throughput;
/**
* Maximum observed throughput for this path
*/
uint64_t maxThroughput;
/**
* Percentage of traffic allocated to this path
*/
float allocation;
/**
* Name of physical interface (for monitoring)
*/
char *ifname;
/**
* Is path alive?
*/
@ -1127,11 +1048,6 @@ typedef struct
*/
unsigned int pathCount;
/**
* Whether this peer was ever reachable via an aggregate link
*/
int hadAggregateLink;
/**
* Known network paths to peer
*/

View file

@ -71,6 +71,7 @@ set(core_src
SelfAwareness.cpp
SHA512.cpp
Switch.cpp
Topology.cpp
Utils.cpp
)

View file

@ -117,202 +117,29 @@
#define ZT_RELAY_MAX_HOPS 4
/**
* Expire time for multicast 'likes' and indirect multicast memberships in ms
*/
#define ZT_MULTICAST_LIKE_EXPIRE 600000
/**
* Period for multicast LIKE re-announcements to connected nodes
*/
#define ZT_MULTICAST_ANNOUNCE_PERIOD 60000
/**
* Packets are only used for QoS/ACK statistical sampling if their packet ID is divisible by
* this integer. This is to provide a mechanism for both peers to agree on which packets need
* special treatment without having to exchange information. Changing this value would be
* a breaking change and would necessitate a protocol version upgrade. Since each incoming and
* outgoing packet ID is checked against this value its evaluation is of the form:
* (id & (divisor - 1)) == 0, thus the divisor must be a power of 2.
*
* This value is set at (16) so that given a normally-distributed RNG output we will sample
* 1/16th (or ~6.25%) of packets.
*/
#define ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR 0x10
/**
* Time horizon for VERB_QOS_MEASUREMENT and VERB_ACK packet processing cutoff
*/
#define ZT_PATH_QOS_ACK_CUTOFF_TIME 30000
/**
* Maximum number of VERB_QOS_MEASUREMENT and VERB_ACK packets allowed to be
* processed within cutoff time. Separate totals are kept for each type but
* the limit is the same for both.
*
* This limits how often this peer will compute statistical estimates
* of various QoS measures from a VERB_QOS_MEASUREMENT or VERB_ACK packets to
* CUTOFF_LIMIT times per CUTOFF_TIME milliseconds per peer to prevent
* this from being useful for DOS amplification attacks.
*/
#define ZT_PATH_QOS_ACK_CUTOFF_LIMIT 128
/**
* Path choice history window size. This is used to keep track of which paths were
* previously selected so that we can maintain a target allocation over time.
*/
#define ZT_MULTIPATH_PROPORTION_WIN_SZ 128
/**
* Interval used for rate-limiting the computation of path quality estimates.
*/
#define ZT_PATH_QUALITY_COMPUTE_INTERVAL 1000
/**
* Number of samples to consider when computing real-time path statistics
*/
#define ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ 128
/**
* Number of samples to consider when computing performing long-term path quality analysis.
* By default this value is set to ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ but can
* be set to any value greater than that to observe longer-term path quality behavior.
*/
#define ZT_PATH_QUALITY_METRIC_WIN_SZ ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ
/**
* Maximum acceptable Packet Delay Variance (PDV) over a path
*/
#define ZT_PATH_MAX_PDV 1000
/**
* Maximum acceptable time interval between expectation and receipt of at least one ACK over a path
*/
#define ZT_PATH_MAX_AGE 30000
/**
* Maximum acceptable mean latency over a path
*/
#define ZT_PATH_MAX_MEAN_LATENCY 1000
/**
* How much each factor contributes to the "stability" score of a path
*/
#define ZT_PATH_CONTRIB_PDV (1.0 / 3.0)
#define ZT_PATH_CONTRIB_LATENCY (1.0 / 3.0)
#define ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE (1.0 / 3.0)
/**
* How much each factor contributes to the "quality" score of a path
*/
#define ZT_PATH_CONTRIB_STABILITY (0.75 / 3.0)
#define ZT_PATH_CONTRIB_THROUGHPUT (1.50 / 3.0)
#define ZT_PATH_CONTRIB_SCOPE (0.75 / 3.0)
/**
* How often a QoS packet is sent
*/
#define ZT_PATH_QOS_INTERVAL 3000
/**
* Min and max acceptable sizes for a VERB_QOS_MEASUREMENT packet
*/
#define ZT_PATH_MIN_QOS_PACKET_SZ 8 + 1
#define ZT_PATH_MAX_QOS_PACKET_SZ 1400
/**
* How many ID:sojourn time pairs in a single QoS packet
*/
#define ZT_PATH_QOS_TABLE_SIZE ((ZT_PATH_MAX_QOS_PACKET_SZ * 8) / (64 + 16))
/**
* Maximum number of outgoing packets we monitor for QoS information
*/
#define ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS 128
/**
* Timeout for QoS records
*/
#define ZT_PATH_QOS_TIMEOUT (ZT_PATH_QOS_INTERVAL * 2)
/**
* How often the service tests the path throughput
*/
#define ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL (ZT_PATH_ACK_INTERVAL * 8)
/**
* Minimum amount of time between each ACK packet
*/
#define ZT_PATH_ACK_INTERVAL 1000
/**
* How often an aggregate link statistics report is emitted into this tracing system
*/
#define ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL 60000
/**
* How much an aggregate link's component paths can vary from their target allocation
* before the link is considered to be in a state of imbalance.
*/
#define ZT_PATH_IMBALANCE_THRESHOLD 0.20
/**
* Max allowable time spent in any queue
*/
#define ZT_QOS_TARGET 5 // ms
/**
* Time period where the time spent in the queue by a packet should fall below
* target at least once
*/
#define ZT_QOS_INTERVAL 100 // ms
/**
* The number of bytes that each queue is allowed to send during each DRR cycle.
* This approximates a single-byte-based fairness queuing scheme
*/
#define ZT_QOS_QUANTUM ZT_DEFAULT_MTU
/**
* The maximum total number of packets that can be queued among all
* active/inactive, old/new queues
*/
#define ZT_QOS_MAX_ENQUEUED_PACKETS 1024
/**
* Number of QoS queues (buckets)
*/
#define ZT_QOS_NUM_BUCKETS 9
/**
* All unspecified traffic is put in this bucket. Anything in a bucket with a smaller
* value is de-prioritized. Anything in a bucket with a higher value is prioritized over
* other traffic.
*/
#define ZT_QOS_DEFAULT_BUCKET 0
/**
* Delay between full-fledge pings of directly connected peers
* Period between keepalives sent to paths if no other traffic has been sent
*
* See https://conferences.sigcomm.org/imc/2010/papers/p260.pdf for
* some real world data on NAT UDP timeouts. From the paper: "the
* lowest measured timeout when a binding has seen bidirectional
* traffic is 54 sec." 30 seconds is faster than really necessary.
*/
#define ZT_PEER_PING_PERIOD 30000
#define ZT_PATH_KEEPALIVE_PERIOD 30000
/**
* Delay between refreshes of locators via DNS or other methods
* Timeout for path aliveness (measured from last receive)
*/
#define ZT_DYNAMIC_ROOT_UPDATE_PERIOD 120000
#define ZT_PATH_ACTIVITY_TIMEOUT ((ZT_PATH_KEEPALIVE_PERIOD * 2) + 5000)
/**
* Delay between full HELLO messages between peers
*/
#define ZT_PEER_PING_PERIOD 60000
/**
* Timeout for overall peer activity (measured from last receive)
*/
#ifndef ZT_SDK
#define ZT_PEER_ACTIVITY_TIMEOUT 500000
#else
#define ZT_PEER_ACTIVITY_TIMEOUT 30000
#endif
#define ZT_PEER_ACTIVITY_TIMEOUT ((ZT_PEER_PING_PERIOD * 2) + 5000)
/**
* Delay between requests for updated network autoconf information
@ -322,15 +149,6 @@
*/
#define ZT_NETWORK_AUTOCONF_DELAY 60000
/**
* Minimum interval between attempts by relays to unite peers
*
* When a relay gets a packet destined for another peer, it sends both peers
* a RENDEZVOUS message no more than this often. This instructs the peers
* to attempt NAT-t and gives each the other's corresponding IP:port pair.
*/
#define ZT_MIN_UNITE_INTERVAL 30000
/**
* Sanity limit on maximum bridge routes
*
@ -357,34 +175,10 @@
*/
#define ZT_DIRECT_PATH_PUSH_INTERVAL_HAVEPATH 120000
/**
* Time horizon for push direct paths cutoff
*/
#define ZT_PUSH_DIRECT_PATHS_CUTOFF_TIME 30000
/**
* Maximum number of direct path pushes within cutoff time
*
* This limits response to PUSH_DIRECT_PATHS to CUTOFF_LIMIT responses
* per CUTOFF_TIME milliseconds per peer to prevent this from being
* useful for DOS amplification attacks.
*/
#define ZT_PUSH_DIRECT_PATHS_CUTOFF_LIMIT 8
/**
* Maximum number of paths per IP scope (e.g. global, link-local) and family (e.g. v4/v6)
*/
#define ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY 8
/**
* Time horizon for VERB_NETWORK_CREDENTIALS cutoff
*/
#define ZT_PEER_CREDENTIALS_CUTOFF_TIME 60000
/**
* Maximum number of VERB_NETWORK_CREDENTIALS within cutoff time
*/
#define ZT_PEER_CREDEITIALS_CUTOFF_LIMIT 15
#define ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY 4
/**
* WHOIS rate limit (we allow these to be pretty fast)
@ -423,11 +217,6 @@
*/
#define ZT_SIGNATURE_BUFFER_SIZE 96
/**
* Desired / recommended min stack size for threads (used on some platforms to reset thread stack size)
*/
#define ZT_THREAD_MIN_STACK_SIZE 1048576
// Internal cryptographic algorithm IDs (these match relevant identity types)
#define ZT_CRYPTO_ALG_C25519 0
#define ZT_CRYPTO_ALG_P384 1

View file

@ -181,14 +181,6 @@ ZT_ALWAYS_INLINE bool _doHELLO(IncomingPacket &pkt,const RuntimeEnvironment *con
return true;
}
ZT_ALWAYS_INLINE bool _doACK(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr<Peer> &peer,const SharedPtr<Path> &path)
{
}
ZT_ALWAYS_INLINE bool _doQOS_MEASUREMENT(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr<Peer> &peer,const SharedPtr<Path> &path)
{
}
ZT_ALWAYS_INLINE bool _doERROR(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr<Peer> &peer,const SharedPtr<Path> &path)
{
const Packet::Verb inReVerb = (Packet::Verb)pkt[ZT_PROTO_VERB_ERROR_IDX_IN_RE_VERB];
@ -272,7 +264,6 @@ ZT_ALWAYS_INLINE bool _doOK(IncomingPacket &pkt,const RuntimeEnvironment *const
return true;
if (pkt.hops() == 0) {
path->updateLatency((unsigned int)latency,RR->node->now());
if ((ZT_PROTO_VERB_HELLO__OK__IDX_REVISION + 2) < pkt.size()) {
InetAddress externalSurfaceAddress;
externalSurfaceAddress.deserialize(pkt,ZT_PROTO_VERB_HELLO__OK__IDX_REVISION + 2);
@ -281,6 +272,7 @@ ZT_ALWAYS_INLINE bool _doOK(IncomingPacket &pkt,const RuntimeEnvironment *const
}
}
peer->updateLatency((unsigned int)latency);
peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision);
} break;
@ -461,7 +453,7 @@ ZT_ALWAYS_INLINE bool _doEXT_FRAME(IncomingPacket &pkt,const RuntimeEnvironment
}
}
if ((flags & 0x10) != 0) { // ACK requested
if ((flags & 0x10U) != 0) { // ACK requested
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_OK);
outp.append((uint8_t)Packet::VERB_EXT_FRAME);
outp.append((uint64_t)pkt.packetId());
@ -497,9 +489,6 @@ ZT_ALWAYS_INLINE bool _doECHO(IncomingPacket &pkt,const RuntimeEnvironment *cons
ZT_ALWAYS_INLINE bool _doNETWORK_CREDENTIALS(IncomingPacket &pkt,const RuntimeEnvironment *const RR,void *const tPtr,const SharedPtr<Peer> &peer,const SharedPtr<Path> &path)
{
if (!peer->rateGateCredentialsReceived(RR->node->now()))
return true;
CertificateOfMembership com;
Capability cap;
Tag tag;
@ -674,7 +663,7 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi
{
const int64_t now = RR->node->now();
if (peer->rateGatePushDirectPaths(now)) {
if (peer->rateGateInboundPushDirectPaths(now)) {
uint8_t countPerScope[ZT_INETADDRESS_MAX_SCOPE+1][2]; // [][0] is v4, [][1] is v6
memset(countPerScope,0,sizeof(countPerScope));
@ -689,7 +678,6 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi
unsigned int addrLen = pkt[ptr++];
switch(addrType) {
case 4: {
const InetAddress a(pkt.field(ptr,4),4,pkt.at<uint16_t>(ptr + 4));
if ((!peer->hasActivePathTo(now,a)) && // not already known
@ -699,7 +687,6 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi
peer->sendHELLO(tPtr,-1,a,now);
}
} break;
case 6: {
const InetAddress a(pkt.field(ptr,16),16,pkt.at<uint16_t>(ptr + 16));
if ((!peer->hasActivePathTo(now,a)) && // not already known
@ -709,8 +696,8 @@ ZT_ALWAYS_INLINE bool _doPUSH_DIRECT_PATHS(IncomingPacket &pkt,const RuntimeEnvi
peer->sendHELLO(tPtr,-1,a,now);
}
} break;
}
ptr += addrLen;
}
}
@ -766,7 +753,6 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
if (!trusted) {
if (!dearmor(peer->key())) {
RR->t->incomingPacketMessageAuthenticationFailure(tPtr,_path,packetId(),sourceAddress,hops(),"invalid MAC");
_path->recordInvalidPacket();
return true;
}
}
@ -784,8 +770,6 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),v,0,Packet::VERB_NOP,0);
break;
case Packet::VERB_HELLO: r = _doHELLO(*this,RR,tPtr,true,_path); break;
case Packet::VERB_ACK: r = _doACK(*this,RR,tPtr,peer,_path); break;
case Packet::VERB_QOS_MEASUREMENT: r = _doQOS_MEASUREMENT(*this,RR,tPtr,peer,_path); break;
case Packet::VERB_ERROR: r = _doERROR(*this,RR,tPtr,peer,_path); break;
case Packet::VERB_OK: r = _doOK(*this,RR,tPtr,peer,_path); break;
case Packet::VERB_WHOIS: r = _doWHOIS(*this,RR,tPtr,peer,_path); break;
@ -814,190 +798,4 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
}
}
#if 0
bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
{
if (!peer->rateGateACK(RR->node->now()))
return true;
/* Dissect incoming ACK packet. From this we can estimate current throughput of the path, establish known
* maximums and detect packet loss. */
if (peer->localMultipathSupport()) {
int32_t ackedBytes;
if (payloadLength() != sizeof(ackedBytes)) {
return true; // ignore
}
memcpy(&ackedBytes, payload(), sizeof(ackedBytes));
_path->receivedAck(RR->node->now(), Utils::ntoh(ackedBytes));
peer->inferRemoteMultipathEnabled();
}
return true;
}
bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
{
if (!peer->rateGateQoS(RR->node->now()))
return true;
/* Dissect incoming QoS packet. From this we can compute latency values and their variance.
* The latency variance is used as a measure of "jitter". */
if (peer->localMultipathSupport()) {
if (payloadLength() > ZT_PATH_MAX_QOS_PACKET_SZ || payloadLength() < ZT_PATH_MIN_QOS_PACKET_SZ) {
return true; // ignore
}
const int64_t now = RR->node->now();
uint64_t rx_id[ZT_PATH_QOS_TABLE_SIZE];
uint16_t rx_ts[ZT_PATH_QOS_TABLE_SIZE];
char *begin = (char *)payload();
char *ptr = begin;
int count = 0;
int len = payloadLength();
// Read packet IDs and latency compensation intervals for each packet tracked by this QoS packet
while (ptr < (begin + len) && (count < ZT_PATH_QOS_TABLE_SIZE)) {
memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t));
ptr+=sizeof(uint64_t);
memcpy((void*)&rx_ts[count], ptr, sizeof(uint16_t));
ptr+=sizeof(uint16_t);
count++;
}
_path->receivedQoS(now, count, rx_id, rx_ts);
peer->inferRemoteMultipathEnabled();
}
return true;
}
bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
{
unsigned int offset = ZT_PACKET_IDX_PAYLOAD;
const uint64_t nwid = at<uint64_t>(offset); offset += 8;
const unsigned int flags = (*this)[offset]; ++offset;
const SharedPtr<Network> network(RR->node->network(nwid));
if (network) {
if ((flags & 0x01) != 0) {
// This is deprecated but may still be sent by old peers
CertificateOfMembership com;
offset += com.deserialize(*this,offset);
if (com)
network->addCredential(tPtr,com);
}
if (!network->gate(tPtr,peer)) {
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
return false;
}
unsigned int gatherLimit = 0;
if ((flags & 0x02) != 0) {
gatherLimit = at<uint32_t>(offset); offset += 4;
}
MAC from;
if ((flags & 0x04) != 0) {
from.setTo(field(offset,6),6); offset += 6;
} else {
from.fromAddress(peer->address(),nwid);
}
const unsigned int recipientsOffset = offset;
std::list<Address> recipients;
if ((flags & 0x08) != 0) {
const unsigned int rc = at<uint16_t>(offset); offset += 2;
for(unsigned int i=0;i<rc;++i) {
const Address a(field(offset,5),5);
if ((a != peer->address())&&(a != RR->identity.address())) {
recipients.push_back(a);
}
offset += 5;
}
}
const unsigned int afterRecipientsOffset = offset;
const MulticastGroup to(MAC(field(offset,6),6),at<uint32_t>(offset + 6)); offset += 10;
const unsigned int etherType = at<uint16_t>(offset); offset += 2;
const unsigned int frameLen = size() - offset;
if (network->config().multicastLimit == 0) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"multicast disabled");
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid);
return true;
}
if (!to.mac().isMulticast()) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"destination not multicast");
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid);
return true;
}
if ((!from)||(from.isMulticast())||(from == network->mac())) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"invalid source MAC");
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid);
return true;
}
if ((frameLen > 0)&&(frameLen <= ZT_MAX_MTU)) {
const uint8_t *const frameData = ((const uint8_t *)unsafeData()) + offset;
if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),from,to.mac(),frameData,frameLen,etherType,0) > 0) {
RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to.mac(),etherType,0,(const void *)frameData,frameLen);
}
}
if (!recipients.empty()) {
// TODO
/*
const std::vector<Address> anchors = network->config().anchors();
const bool amAnchor = (std::find(anchors.begin(),anchors.end(),RR->identity.address()) != anchors.end());
for(std::list<Address>::iterator ra(recipients.begin());ra!=recipients.end();) {
SharedPtr<Peer> recipient(RR->topology->get(*ra));
if ((recipient)&&((recipient->remoteVersionProtocol() < 10)||(amAnchor))) {
Packet outp(*ra,RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
outp.append(field(ZT_PACKET_IDX_PAYLOAD,recipientsOffset - ZT_PACKET_IDX_PAYLOAD),recipientsOffset - ZT_PACKET_IDX_PAYLOAD);
outp.append(field(afterRecipientsOffset,size() - afterRecipientsOffset),size() - afterRecipientsOffset);
RR->sw->send(tPtr,outp,true);
recipients.erase(ra++);
} else ++ra;
}
if (!recipients.empty()) {
Packet outp(recipients.front(),RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
recipients.pop_front();
outp.append(field(ZT_PACKET_IDX_PAYLOAD,recipientsOffset - ZT_PACKET_IDX_PAYLOAD),recipientsOffset - ZT_PACKET_IDX_PAYLOAD);
if (!recipients.empty()) {
outp.append((uint16_t)recipients.size());
for(std::list<Address>::iterator ra(recipients.begin());ra!=recipients.end();++ra)
ra->appendTo(outp);
}
outp.append(field(afterRecipientsOffset,size() - afterRecipientsOffset),size() - afterRecipientsOffset);
RR->sw->send(tPtr,outp,true);
}
*/
}
if (gatherLimit) { // DEPRECATED but still supported
/*
Packet outp(source(),RR->identity.address(),Packet::VERB_OK);
outp.append((unsigned char)Packet::VERB_MULTICAST_FRAME);
outp.append(packetId());
outp.append(nwid);
to.mac().appendTo(outp);
outp.append((uint32_t)to.adi());
outp.append((unsigned char)0x02); // flag 0x02 = contains gather results
if (RR->mc->gather(peer->address(),nwid,to,outp,gatherLimit)) {
outp.armor(peer->key(),true);
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
*/
}
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,nwid);
return true;
} else {
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
return false;
}
}
#endif
} // namespace ZeroTier

View file

@ -25,12 +25,12 @@
namespace ZeroTier {
// libpthread based mutex lock
class Mutex
{
public:
ZT_ALWAYS_INLINE Mutex() { pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0); }
ZT_ALWAYS_INLINE Mutex() { pthread_mutex_init(&_mh,0); }
ZT_ALWAYS_INLINE ~Mutex() { pthread_mutex_destroy(&_mh); }
ZT_ALWAYS_INLINE void lock() const { pthread_mutex_lock(&((const_cast <Mutex *> (this))->_mh)); }
ZT_ALWAYS_INLINE void unlock() const { pthread_mutex_unlock(&((const_cast <Mutex *> (this))->_mh)); }
@ -51,6 +51,43 @@ private:
pthread_mutex_t _mh;
};
class RWMutex
{
public:
ZT_ALWAYS_INLINE RWMutex() { pthread_rwlock_init(&_mh,0); }
ZT_ALWAYS_INLINE ~RWMutex() { pthread_rwlock_destroy(&_mh); }
ZT_ALWAYS_INLINE void lock() const { pthread_rwlock_wrlock(&((const_cast <RWMutex *> (this))->_mh)); }
ZT_ALWAYS_INLINE void rlock() const { pthread_rwlock_rdlock(&((const_cast <RWMutex *> (this))->_mh)); }
ZT_ALWAYS_INLINE void unlock() const { pthread_rwlock_unlock(&((const_cast <RWMutex *> (this))->_mh)); }
class RLock
{
public:
ZT_ALWAYS_INLINE RLock(RWMutex &m) : _m(&m) { m.rlock(); }
ZT_ALWAYS_INLINE RLock(const RWMutex &m) : _m(const_cast<RWMutex *>(&m)) { _m->rlock(); }
ZT_ALWAYS_INLINE ~RLock() { _m->unlock(); }
private:
RWMutex *const _m;
};
class Lock
{
public:
ZT_ALWAYS_INLINE Lock(RWMutex &m) : _m(&m) { m.lock(); }
ZT_ALWAYS_INLINE Lock(const RWMutex &m) : _m(const_cast<RWMutex *>(&m)) { _m->lock(); }
ZT_ALWAYS_INLINE ~Lock() { _m->unlock(); }
private:
RWMutex *const _m;
};
private:
ZT_ALWAYS_INLINE RWMutex(const RWMutex &) {}
ZT_ALWAYS_INLINE const RWMutex &operator=(const RWMutex &) { return *this; }
pthread_rwlock_t _mh;
};
} // namespace ZeroTier
#endif
@ -61,7 +98,6 @@ private:
namespace ZeroTier {
// Windows critical section based lock
class Mutex
{
public:

View file

@ -72,11 +72,6 @@ namespace ZeroTier {
*/
#define ZT_NETWORKCONFIG_FLAG_RULES_RESULT_OF_UNSUPPORTED_MATCH 0x0000000000000008ULL
/**
* Flag: disable frame compression
*/
#define ZT_NETWORKCONFIG_FLAG_DISABLE_COMPRESSION 0x0000000000000010ULL
/**
* Device can bridge to other Ethernet networks and gets unknown recipient multicasts
*/
@ -88,7 +83,7 @@ namespace ZeroTier {
#define ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR 0x0000040000000000ULL
/**
* Device that can probe and receive remote trace info about this network
* Device that is allowed to remotely debug connectivity on this network
*/
#define ZT_NETWORKCONFIG_SPECIALIST_TYPE_DIAGNOSTICIAN 0x0000080000000000ULL
@ -223,22 +218,6 @@ struct NetworkConfig
*/
inline bool ndpEmulation() const { return ((this->flags & ZT_NETWORKCONFIG_FLAG_ENABLE_IPV6_NDP_EMULATION) != 0); }
/**
* @return True if frames should not be compressed
*/
inline bool disableCompression() const
{
#ifndef ZT_DISABLE_COMPRESSION
return ((this->flags & ZT_NETWORKCONFIG_FLAG_DISABLE_COMPRESSION) != 0);
#else
/* Compression is disabled for libzt builds since it causes non-obvious chaotic
interference with lwIP's TCP congestion algorithm. Compression is also disabled
for some NAS builds due to the usage of low-performance processors in certain
older and budget models. */
return true;
#endif
}
/**
* @return Network type is public (no access control)
*/

View file

@ -298,7 +298,6 @@ ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr)
{
Mutex::Lock _l(_networks_m);
SharedPtr<Network> *nw = _networks.get(nwid);
RR->sw->removeNetworkQoSControlBlock(nwid);
if (!nw)
return ZT_RESULT_OK;
if (uptr)
@ -400,7 +399,6 @@ ZT_PeerList *Node::peers() const
p->address = (*pi)->address().toInt();
identities[pl->peerCount] = (*pi)->identity(); // need to make a copy in case peer gets deleted
p->identity = &identities[pl->peerCount];
p->hadAggregateLink = 0;
if ((*pi)->remoteVersionKnown()) {
p->versionMajor = (int)(*pi)->remoteVersionMajor();
p->versionMinor = (int)(*pi)->remoteVersionMinor();
@ -417,7 +415,6 @@ ZT_PeerList *Node::peers() const
std::vector< SharedPtr<Path> > paths((*pi)->paths(now));
SharedPtr<Path> bestp((*pi)->getAppropriatePath(now,false));
p->hadAggregateLink |= (*pi)->hasAggregateLink();
p->pathCount = 0;
for(std::vector< SharedPtr<Path> >::iterator path(paths.begin());path!=paths.end();++path) {
memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage));
@ -426,16 +423,6 @@ ZT_PeerList *Node::peers() const
p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address());
p->paths[p->pathCount].alive = (*path)->alive(now) ? 1 : 0;
p->paths[p->pathCount].preferred = ((*path) == bestp) ? 1 : 0;
p->paths[p->pathCount].latency = (float)(*path)->latency();
p->paths[p->pathCount].packetDelayVariance = (*path)->packetDelayVariance();
p->paths[p->pathCount].throughputDisturbCoeff = (*path)->throughputDisturbanceCoefficient();
p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio();
p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio();
p->paths[p->pathCount].stability = (*path)->lastComputedStability();
p->paths[p->pathCount].throughput = (*path)->meanThroughput();
p->paths[p->pathCount].maxThroughput = (*path)->maxLifetimeThroughput();
p->paths[p->pathCount].allocation = (float)(*path)->allocation() / (float)255;
p->paths[p->pathCount].ifname = (*path)->getName();
++p->pathCount;
}

View file

@ -26,4 +26,35 @@ bool Path::send(const RuntimeEnvironment *RR,void *tPtr,const void *data,unsigne
return false;
}
bool Path::isAddressValidForPath(const InetAddress &a)
{
if ((a.ss_family == AF_INET)||(a.ss_family == AF_INET6)) {
switch(a.ipScope()) {
/* Note: we don't do link-local at the moment. Unfortunately these
* cause several issues. The first is that they usually require a
* device qualifier, which we don't handle yet and can't portably
* push in PUSH_DIRECT_PATHS. The second is that some OSes assign
* these very ephemerally or otherwise strangely. So we'll use
* private, pseudo-private, shared (e.g. carrier grade NAT), or
* global IP addresses. */
case InetAddress::IP_SCOPE_PRIVATE:
case InetAddress::IP_SCOPE_PSEUDOPRIVATE:
case InetAddress::IP_SCOPE_SHARED:
case InetAddress::IP_SCOPE_GLOBAL:
if (a.ss_family == AF_INET6) {
// TEMPORARY HACK: for now, we are going to blacklist he.net IPv6
// tunnels due to very spotty performance and low MTU issues over
// these IPv6 tunnel links.
const uint8_t *ipd = reinterpret_cast<const uint8_t *>(reinterpret_cast<const struct sockaddr_in6 *>(&a)->sin6_addr.s6_addr);
if ((ipd[0] == 0x20)&&(ipd[1] == 0x01)&&(ipd[2] == 0x04)&&(ipd[3] == 0x70))
return false;
}
return true;
default:
return false;
}
}
return false;
}
} // namespace ZeroTier

View file

@ -26,15 +26,8 @@
#include "SharedPtr.hpp"
#include "AtomicCounter.hpp"
#include "Utils.hpp"
#include "RingBuffer.hpp"
#include "Packet.hpp"
#include "Mutex.hpp"
/**
* Maximum return value of preferenceRank()
*/
#define ZT_PATH_MAX_PREFERENCE_RANK ((ZT_INETADDRESS_MAX_SCOPE << 1) | 1)
namespace ZeroTier {
class RuntimeEnvironment;
@ -54,7 +47,6 @@ public:
{
public:
ZT_ALWAYS_INLINE HashKey() {}
ZT_ALWAYS_INLINE HashKey(const int64_t l,const InetAddress &r)
{
if (r.ss_family == AF_INET) {
@ -79,81 +71,15 @@ public:
uint64_t _k[3];
};
inline Path() :
_lastOut(0),
ZT_ALWAYS_INLINE Path(const int64_t l,const InetAddress &r) :
_localSocket(l),
_lastIn(0),
_lastPathQualityComputeTime(0),
_localSocket(-1),
_latency(0xffff),
_addr(),
_ipScope(InetAddress::IP_SCOPE_NONE),
_lastAck(0),
_lastThroughputEstimation(0),
_lastQoSMeasurement(0),
_lastQoSRecordPurge(0),
_unackedBytes(0),
_expectingAckAsOf(0),
_packetsReceivedSinceLastAck(0),
_packetsReceivedSinceLastQoS(0),
_maxLifetimeThroughput(0),
_lastComputedMeanThroughput(0),
_bytesAckedSinceLastThroughputEstimation(0),
_lastComputedMeanLatency(0.0),
_lastComputedPacketDelayVariance(0.0),
_lastComputedPacketErrorRatio(0.0),
_lastComputedPacketLossRatio(0),
_lastComputedStability(0.0),
_lastComputedRelativeQuality(0),
_lastComputedThroughputDistCoeff(0.0),
_lastAllocation(0)
{
memset(_ifname, 0, 16);
memset(_addrString, 0, sizeof(_addrString));
}
inline Path(const int64_t localSocket,const InetAddress &addr) :
_lastOut(0),
_lastIn(0),
_lastPathQualityComputeTime(0),
_localSocket(localSocket),
_latency(0xffff),
_addr(addr),
_ipScope(addr.ipScope()),
_lastAck(0),
_lastThroughputEstimation(0),
_lastQoSMeasurement(0),
_lastQoSRecordPurge(0),
_unackedBytes(0),
_expectingAckAsOf(0),
_packetsReceivedSinceLastAck(0),
_packetsReceivedSinceLastQoS(0),
_maxLifetimeThroughput(0),
_lastComputedMeanThroughput(0),
_bytesAckedSinceLastThroughputEstimation(0),
_lastComputedMeanLatency(0.0),
_lastComputedPacketDelayVariance(0.0),
_lastComputedPacketErrorRatio(0.0),
_lastComputedPacketLossRatio(0),
_lastComputedStability(0.0),
_lastComputedRelativeQuality(0),
_lastComputedThroughputDistCoeff(0.0),
_lastAllocation(0)
_addr(r),
__refCount()
{
memset(_ifname, 0, 16);
memset(_addrString, 0, sizeof(_addrString));
if (_localSocket != -1) {
// TODO: add localInterface alongside localSocket
//_phy->getIfName((PhySocket *) ((uintptr_t) _localSocket), _ifname, 16);
}
}
/**
* Called when a packet is received from this remote path, regardless of content
*
* @param t Time of receive
*/
inline void received(const uint64_t t) { _lastIn = t; }
/**
* Send a packet via this path (last out time is also updated)
*
@ -167,53 +93,38 @@ public:
bool send(const RuntimeEnvironment *RR,void *tPtr,const void *data,unsigned int len,int64_t now);
/**
* Manually update last sent time
* Called when a packet is received from this remote path, regardless of content
*
* @param t Time of send
* @param t Time of receive
*/
inline void sent(const int64_t t) { _lastOut = t; }
ZT_ALWAYS_INLINE void received(const uint64_t t) { _lastIn = t; }
/**
* Update path latency with a new measurement
* Check path aliveness
*
* @param l Measured latency
* @param now Current time
*/
inline void updateLatency(const unsigned int l, int64_t now)
{
unsigned int pl = _latency;
if (pl < 0xffff) {
_latency = (pl + l) / 2;
}
else {
_latency = l;
}
_latencySamples.push(l);
}
/**
* @return Local socket as specified by external code
*/
inline int64_t localSocket() const { return _localSocket; }
ZT_ALWAYS_INLINE bool alive(const int64_t now) const { return ((now - _lastIn) < ZT_PATH_ACTIVITY_TIMEOUT); }
/**
* @return Physical address
*/
inline const InetAddress &address() const { return _addr; }
ZT_ALWAYS_INLINE const InetAddress &address() const { return _addr; }
/**
* @return IP scope -- faster shortcut for address().ipScope()
* @return Local socket as specified by external code
*/
inline InetAddress::IpScope ipScope() const { return _ipScope; }
ZT_ALWAYS_INLINE int64_t localSocket() const { return _localSocket; }
/**
* @return Preference rank, higher == better
* @return Last time we received anything
*/
inline unsigned int preferenceRank() const
{
// This causes us to rank paths in order of IP scope rank (see InetAdddress.hpp) but
// within each IP scope class to prefer IPv6 over IPv4.
return ( ((unsigned int)_ipScope << 1) | (unsigned int)(_addr.ss_family == AF_INET6) );
}
ZT_ALWAYS_INLINE int64_t lastIn() const { return _lastIn; }
/**
* @return Last time we sent something
*/
ZT_ALWAYS_INLINE int64_t lastOut() const { return _lastOut; }
/**
* Check whether this address is valid for a ZeroTier path
@ -224,443 +135,14 @@ public:
* @param a Address to check
* @return True if address is good for ZeroTier path use
*/
static inline bool isAddressValidForPath(const InetAddress &a)
{
if ((a.ss_family == AF_INET)||(a.ss_family == AF_INET6)) {
switch(a.ipScope()) {
/* Note: we don't do link-local at the moment. Unfortunately these
* cause several issues. The first is that they usually require a
* device qualifier, which we don't handle yet and can't portably
* push in PUSH_DIRECT_PATHS. The second is that some OSes assign
* these very ephemerally or otherwise strangely. So we'll use
* private, pseudo-private, shared (e.g. carrier grade NAT), or
* global IP addresses. */
case InetAddress::IP_SCOPE_PRIVATE:
case InetAddress::IP_SCOPE_PSEUDOPRIVATE:
case InetAddress::IP_SCOPE_SHARED:
case InetAddress::IP_SCOPE_GLOBAL:
if (a.ss_family == AF_INET6) {
// TEMPORARY HACK: for now, we are going to blacklist he.net IPv6
// tunnels due to very spotty performance and low MTU issues over
// these IPv6 tunnel links.
const uint8_t *ipd = reinterpret_cast<const uint8_t *>(reinterpret_cast<const struct sockaddr_in6 *>(&a)->sin6_addr.s6_addr);
if ((ipd[0] == 0x20)&&(ipd[1] == 0x01)&&(ipd[2] == 0x04)&&(ipd[3] == 0x70))
return false;
}
return true;
default:
return false;
}
}
return false;
}
/**
* @return Latency or 0xffff if unknown
*/
inline unsigned int latency() const { return _latency; }
/**
* @return Path quality -- lower is better
*/
inline long quality(const int64_t now) const
{
const long l = (long)_latency;
const long age = (long)std::min((long)(now - _lastIn),(long)(ZT_PEER_PING_PERIOD * 10)); // set an upper sanity limit to avoid overflow
return ( ( (age < (ZT_PEER_PING_PERIOD + 5000)) ? l : (l + 65535 + age) ) * (long)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1));
}
/**
* Record statistics on outgoing packets. Used later to estimate QoS metrics.
*
* @param now Current time
* @param packetId ID of packet
* @param payloadLength Length of payload
* @param verb Packet verb
*/
inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
{
Mutex::Lock _l(_statistics_m);
if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
_unackedBytes += payloadLength;
// Take note that we're expecting a VERB_ACK on this path as of a specific time
_expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now;
if (_outQoSRecords.size() < ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS) {
_outQoSRecords[packetId] = now;
}
}
}
}
/**
* Record statistics on incoming packets. Used later to estimate QoS metrics.
*
* @param now Current time
* @param packetId ID of packet
* @param payloadLength Length of payload
* @param verb Packet verb
*/
inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
{
Mutex::Lock _l(_statistics_m);
if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
_inACKRecords[packetId] = payloadLength;
_packetsReceivedSinceLastAck++;
_inQoSRecords[packetId] = now;
_packetsReceivedSinceLastQoS++;
}
_packetValiditySamples.push(true);
}
}
/**
* Record that we've received a VERB_ACK on this path, also compute throughput if required.
*
* @param now Current time
* @param ackedBytes Number of bytes acknowledged by other peer
*/
inline void receivedAck(int64_t now, int32_t ackedBytes)
{
_expectingAckAsOf = 0;
_unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes;
int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation);
if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) {
uint64_t throughput = (uint64_t)((float)(_bytesAckedSinceLastThroughputEstimation * 8) / ((float)timeSinceThroughputEstimate / (float)1000));
_throughputSamples.push(throughput);
_maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput;
_lastThroughputEstimation = now;
_bytesAckedSinceLastThroughputEstimation = 0;
} else {
_bytesAckedSinceLastThroughputEstimation += ackedBytes;
}
}
/**
* @return Number of bytes this peer is responsible for ACKing since last ACK
*/
inline int32_t bytesToAck()
{
Mutex::Lock _l(_statistics_m);
int32_t bytesToAck = 0;
std::map<uint64_t,uint16_t>::iterator it = _inACKRecords.begin();
while (it != _inACKRecords.end()) {
bytesToAck += it->second;
it++;
}
return bytesToAck;
}
/**
* @return Number of bytes thus far sent that have not been acknowledged by the remote peer
*/
inline int64_t unackedSentBytes()
{
return _unackedBytes;
}
/**
* Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers
*
* @param Current time
*/
inline void sentAck(int64_t now)
{
Mutex::Lock _l(_statistics_m);
_inACKRecords.clear();
_packetsReceivedSinceLastAck = 0;
_lastAck = now;
}
/**
* Receive QoS data, match with recorded egress times from this peer, compute latency
* estimates.
*
* @param now Current time
* @param count Number of records
* @param rx_id table of packet IDs
* @param rx_ts table of holding times
*/
inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts)
{
Mutex::Lock _l(_statistics_m);
// Look up egress times and compute latency values for each record
std::map<uint64_t,uint64_t>::iterator it;
for (int j=0; j<count; j++) {
it = _outQoSRecords.find(rx_id[j]);
if (it != _outQoSRecords.end()) {
uint16_t rtt = (uint16_t)(now - it->second);
uint16_t rtt_compensated = rtt - rx_ts[j];
uint16_t latency = rtt_compensated / 2;
updateLatency(latency, now);
_outQoSRecords.erase(it);
}
}
}
/**
* Generate the contents of a VERB_QOS_MEASUREMENT packet.
*
* @param now Current time
* @param qosBuffer destination buffer
* @return Size of payload
*/
inline int32_t generateQoSPacket(int64_t now, char *qosBuffer)
{
Mutex::Lock _l(_statistics_m);
int32_t len = 0;
std::map<uint64_t,uint64_t>::iterator it = _inQoSRecords.begin();
int i=0;
while (i<_packetsReceivedSinceLastQoS && it != _inQoSRecords.end()) {
uint64_t id = it->first;
memcpy(qosBuffer, &id, sizeof(uint64_t));
qosBuffer+=sizeof(uint64_t);
uint16_t holdingTime = (uint16_t)(now - it->second);
memcpy(qosBuffer, &holdingTime, sizeof(uint16_t));
qosBuffer+=sizeof(uint16_t);
len+=sizeof(uint64_t)+sizeof(uint16_t);
_inQoSRecords.erase(it++);
i++;
}
return len;
}
/**
* Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
*
* @param Current time
*/
inline void sentQoS(int64_t now) {
_packetsReceivedSinceLastQoS = 0;
_lastQoSMeasurement = now;
}
/**
* @param now Current time
* @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time
*/
inline bool needsToSendAck(int64_t now) {
return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL ||
(_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck;
}
/**
* @param now Current time
* @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time
*/
inline bool needsToSendQoS(int64_t now) {
return ((_packetsReceivedSinceLastQoS >= ZT_PATH_QOS_TABLE_SIZE) ||
((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastQoS;
}
/**
* How much time has elapsed since we've been expecting a VERB_ACK on this path. This value
* is used to determine a more relevant path "age". This lets us penalize paths which are no
* longer ACKing, but not those that simple aren't being used to carry traffic at the
* current time.
*/
inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; }
/**
* The maximum observed throughput (in bits/s) for this path
*/
inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; }
/**
* @return The mean throughput (in bits/s) of this link
*/
inline uint64_t meanThroughput() { return _lastComputedMeanThroughput; }
/**
* Assign a new relative quality value for this path in the aggregate link
*
* @param rq Quality of this path in comparison to other paths available to this peer
*/
inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; }
/**
* @return Quality of this path compared to others in the aggregate link
*/
inline float relativeQuality() { return _lastComputedRelativeQuality; }
/**
* Assign a new allocation value for this path in the aggregate link
*
* @param allocation Percentage of traffic to be sent over this path to a peer
*/
inline void updateComponentAllocationOfAggregateLink(unsigned char allocation) { _lastAllocation = allocation; }
/**
* @return Percentage of traffic allocated to this path in the aggregate link
*/
inline unsigned char allocation() { return _lastAllocation; }
/**
* @return Stability estimates can become expensive to compute, we cache the most recent result.
*/
inline float lastComputedStability() { return _lastComputedStability; }
/**
* @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
*/
inline char *getName() { return _ifname; }
/**
* @return Packet delay variance
*/
inline float packetDelayVariance() { return _lastComputedPacketDelayVariance; }
/**
* @return Previously-computed mean latency
*/
inline float meanLatency() { return _lastComputedMeanLatency; }
/**
* @return Packet loss rate (PLR)
*/
inline float packetLossRatio() { return _lastComputedPacketLossRatio; }
/**
* @return Packet error ratio (PER)
*/
inline float packetErrorRatio() { return _lastComputedPacketErrorRatio; }
/**
* Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now
* contribute to a Packet Error Ratio (PER).
*/
inline void recordInvalidPacket() { _packetValiditySamples.push(false); }
/**
* @return A pointer to a cached copy of the address string for this Path (For debugging only)
*/
inline char *getAddressString() { return _addrString; }
/**
* @return The current throughput disturbance coefficient
*/
inline float throughputDisturbanceCoefficient() { return _lastComputedThroughputDistCoeff; }
/**
* Compute and cache stability and performance metrics. The resultant stability coefficient is a measure of how "well behaved"
* this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality".
*
* @param now Current time
*/
inline void processBackgroundPathMeasurements(const int64_t now)
{
if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
Mutex::Lock _l(_statistics_m);
_lastPathQualityComputeTime = now;
address().toString(_addrString);
_lastComputedMeanLatency = _latencySamples.mean();
_lastComputedPacketDelayVariance = _latencySamples.stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
_lastComputedMeanThroughput = (uint64_t)_throughputSamples.mean();
// If no packet validity samples, assume PER==0
_lastComputedPacketErrorRatio = 1 - (_packetValiditySamples.count() ? _packetValiditySamples.mean() : 1);
// Compute path stability
// Normalize measurements with wildly different ranges into a reasonable range
float normalized_pdv = Utils::normalize(_lastComputedPacketDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
float normalized_la = Utils::normalize(_lastComputedMeanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
float throughput_cv = _throughputSamples.mean() > 0 ? _throughputSamples.stddev() / _throughputSamples.mean() : 1;
// Form an exponential cutoff and apply contribution weights
float pdv_contrib = expf((-1.0f)*normalized_pdv) * (float)ZT_PATH_CONTRIB_PDV;
float latency_contrib = expf((-1.0f)*normalized_la) * (float)ZT_PATH_CONTRIB_LATENCY;
// Throughput Disturbance Coefficient
float throughput_disturbance_contrib = expf((-1.0f)*throughput_cv) * (float)ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE;
_throughputDisturbanceSamples.push(throughput_cv);
_lastComputedThroughputDistCoeff = _throughputDisturbanceSamples.mean();
// Obey user-defined ignored contributions
pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1;
latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1;
throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1;
// Stability
_lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib;
_lastComputedStability *= 1 - _lastComputedPacketErrorRatio;
// Prevent QoS records from sticking around for too long
std::map<uint64_t,uint64_t>::iterator it = _outQoSRecords.begin();
while (it != _outQoSRecords.end()) {
// Time since egress of tracked packet
if ((now - it->second) >= ZT_PATH_QOS_TIMEOUT) {
_outQoSRecords.erase(it++);
} else { it++; }
}
}
}
/**
* @return True if this path is alive (receiving data)
*/
inline bool alive(const int64_t now) const { return ((now - _lastIn) < ((ZT_PEER_PING_PERIOD * 2) + 5000)); }
/**
* @return Last time we sent something
*/
inline int64_t lastOut() const { return _lastOut; }
/**
* @return Last time we received anything
*/
inline int64_t lastIn() const { return _lastIn; }
static bool isAddressValidForPath(const InetAddress &a);
private:
Mutex _statistics_m;
volatile int64_t _lastOut;
volatile int64_t _lastIn;
volatile int64_t _lastPathQualityComputeTime;
int64_t _localSocket;
volatile unsigned int _latency;
int64_t _lastIn;
int64_t _lastOut;
InetAddress _addr;
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
AtomicCounter __refCount;
std::map<uint64_t,uint64_t> _outQoSRecords; // id:egress_time
std::map<uint64_t,uint64_t> _inQoSRecords; // id:now
std::map<uint64_t,uint16_t> _inACKRecords; // id:len
int64_t _lastAck;
int64_t _lastThroughputEstimation;
int64_t _lastQoSMeasurement;
int64_t _lastQoSRecordPurge;
int64_t _unackedBytes;
int64_t _expectingAckAsOf;
int16_t _packetsReceivedSinceLastAck;
int16_t _packetsReceivedSinceLastQoS;
uint64_t _maxLifetimeThroughput;
uint64_t _lastComputedMeanThroughput;
uint64_t _bytesAckedSinceLastThroughputEstimation;
float _lastComputedMeanLatency;
float _lastComputedPacketDelayVariance;
float _lastComputedPacketErrorRatio;
float _lastComputedPacketLossRatio;
// cached estimates
float _lastComputedStability;
float _lastComputedRelativeQuality;
float _lastComputedThroughputDistCoeff;
unsigned char _lastAllocation;
// cached human-readable strings for tracing purposes
char _ifname[16];
char _addrString[256];
RingBuffer<uint64_t,ZT_PATH_QUALITY_METRIC_WIN_SZ> _throughputSamples;
RingBuffer<uint32_t,ZT_PATH_QUALITY_METRIC_WIN_SZ> _latencySamples;
RingBuffer<bool,ZT_PATH_QUALITY_METRIC_WIN_SZ> _packetValiditySamples;
RingBuffer<float,ZT_PATH_QUALITY_METRIC_WIN_SZ> _throughputDisturbanceSamples;
};
} // namespace ZeroTier

View file

@ -20,42 +20,23 @@
#include "Packet.hpp"
#include "Trace.hpp"
#include "InetAddress.hpp"
#include "RingBuffer.hpp"
#include "Utils.hpp"
#include "ScopedPtr.hpp"
namespace ZeroTier {
Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) :
RR(renv),
_lastReceive(0),
_lastDirectPathPushSent(0),
_lastDirectPathPushReceive(0),
_lastCredentialRequestSent(0),
_lastWhoisRequestReceived(0),
_lastEchoRequestReceived(0),
_lastCredentialsReceived(0),
_lastACKWindowReset(0),
_lastQoSWindowReset(0),
_lastMultipathCompatibilityCheck(0),
_lastPushDirectPathsReceived(0),
_lastTriedStaticPath(0),
_uniqueAlivePathCount(0),
_localMultipathSupported(false),
_remoteMultipathSupported(false),
_canUseMultipath(false),
_freeRandomByte((uint8_t)Utils::random()),
_latency(0xffff),
_pathCount(0),
_id(peerIdentity),
_vProto(0),
_vMajor(0),
_vMinor(0),
_vRevision(0),
_id(peerIdentity),
_directPathPushCutoffCount(0),
_credentialsCutoffCount(0),
_linkIsBalanced(false),
_linkIsRedundant(false),
_remotePeerMultipathEnabled(false),
_lastAggregateStatsReport(0),
_lastAggregateAllocation(0)
_vRevision(0)
{
if (!myIdentity.agree(peerIdentity,_key))
throw ZT_EXCEPTION_INVALID_ARGUMENT;
@ -76,23 +57,7 @@ void Peer::received(
_lastReceive = now;
{
Mutex::Lock _l(_paths_m);
recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
if (_canUseMultipath) {
if (path->needsToSendQoS(now)) {
sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now);
}
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
_paths[i]->processBackgroundPathMeasurements(now);
}
}
}
}
/*
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
bool havePath = false;
@ -164,9 +129,11 @@ void Peer::received(
RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
}
}
*/
// Periodically push direct paths to the peer, doing so more often if we do not
// currently have a direct path.
/*
const int64_t sinceLastPush = now - _lastDirectPathPushSent;
if (sinceLastPush >= ((hops == 0) ? ZT_DIRECT_PATH_PUSH_INTERVAL_HAVEPATH : ZT_DIRECT_PATH_PUSH_INTERVAL)) {
_lastDirectPathPushSent = now;
@ -219,452 +186,13 @@ void Peer::received(
}
}
}
*/
}
void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, int64_t now)
bool Peer::hasActivePathTo(int64_t now,const InetAddress &addr) const
{
_freeRandomByte += (unsigned char)(packetId >> 8); // grab entropy to use in path selection logic for multipath
if (_canUseMultipath) {
path->recordOutgoingPacket(now, packetId, payloadLength, verb);
}
}
void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, int64_t now)
{
if (_canUseMultipath) {
if (path->needsToSendAck(now)) {
sendACK(tPtr, path, path->localSocket(), path->address(), now);
}
path->recordIncomingPacket(now, packetId, payloadLength, verb);
}
}
void Peer::computeAggregateProportionalAllocation(int64_t now)
{
float maxStability = 0;
float totalRelativeQuality = 0;
float maxThroughput = 1;
float maxScope = 0;
float relStability[ZT_MAX_PEER_NETWORK_PATHS];
float relThroughput[ZT_MAX_PEER_NETWORK_PATHS];
memset(&relStability, 0, sizeof(relStability));
memset(&relThroughput, 0, sizeof(relThroughput));
// Survey all paths
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
relStability[i] = _paths[i]->lastComputedStability();
relThroughput[i] = (float)_paths[i]->maxLifetimeThroughput();
maxStability = relStability[i] > maxStability ? relStability[i] : maxStability;
maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput;
maxScope = _paths[i]->ipScope() > maxScope ? _paths[i]->ipScope() : maxScope;
}
}
// Convert to relative values
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
relStability[i] /= maxStability ? maxStability : 1;
relThroughput[i] /= maxThroughput ? maxThroughput : 1;
float normalized_ma = Utils::normalize((float)_paths[i]->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10);
float age_contrib = exp((-1)*normalized_ma);
float relScope = ((float)(_paths[i]->ipScope()+1) / (maxScope + 1));
float relQuality =
(relStability[i] * (float)ZT_PATH_CONTRIB_STABILITY)
+ (fmaxf(1.0f, relThroughput[i]) * (float)ZT_PATH_CONTRIB_THROUGHPUT)
+ relScope * (float)ZT_PATH_CONTRIB_SCOPE;
relQuality *= age_contrib;
// Arbitrary cutoffs
relQuality = relQuality > (1.00f / 100.0f) ? relQuality : 0.0f;
relQuality = relQuality < (99.0f / 100.0f) ? relQuality : 1.0f;
totalRelativeQuality += relQuality;
_paths[i]->updateRelativeQuality(relQuality);
}
}
// Convert set of relative performances into an allocation set
for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
_paths[i]->updateComponentAllocationOfAggregateLink((unsigned char)((_paths[i]->relativeQuality() / totalRelativeQuality) * 255));
}
}
}
int Peer::computeAggregateLinkPacketDelayVariance()
{
float pdv = 0.0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
pdv += _paths[i]->relativeQuality() * _paths[i]->packetDelayVariance();
}
}
return (int)pdv;
}
int Peer::computeAggregateLinkMeanLatency()
{
int ml = 0;
int pathCount = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
pathCount++;
ml += (int)(_paths[i]->relativeQuality() * _paths[i]->meanLatency());
}
}
return ml / pathCount;
}
int Peer::aggregateLinkPhysicalPathCount()
{
std::map<std::string, bool> ifnamemap;
int pathCount = 0;
int64_t now = RR->node->now();
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i] && _paths[i]->alive(now)) {
if (!ifnamemap[_paths[i]->getName()]) {
ifnamemap[_paths[i]->getName()] = true;
pathCount++;
}
}
}
return pathCount;
}
int Peer::aggregateLinkLogicalPathCount()
{
int pathCount = 0;
int64_t now = RR->node->now();
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i] && _paths[i]->alive(now)) {
pathCount++;
}
}
return pathCount;
}
SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
{
Mutex::Lock _l(_paths_m);
unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
/**
* Send traffic across the highest quality path only. This algorithm will still
* use the old path quality metric from protocol version 9.
*/
if (!_canUseMultipath) {
long bestPathQuality = 2147483647;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
if ((includeExpired)||(_paths[i]->alive(now))) {
const long q = _paths[i]->quality(now);
if (q <= bestPathQuality) {
bestPathQuality = q;
bestPath = i;
}
}
} else break;
}
if (bestPath != ZT_MAX_PEER_NETWORK_PATHS) {
return _paths[bestPath];
}
return SharedPtr<Path>();
}
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
_paths[i]->processBackgroundPathMeasurements(now);
}
}
/**
* Randomly distribute traffic across all paths
*/
int numAlivePaths = 0;
int numStalePaths = 0;
if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) {
int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
memset(&alivePaths, -1, sizeof(alivePaths));
memset(&stalePaths, -1, sizeof(stalePaths));
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
if (_paths[i]->alive(now)) {
alivePaths[numAlivePaths] = i;
numAlivePaths++;
}
else {
stalePaths[numStalePaths] = i;
numStalePaths++;
}
}
}
unsigned int r = _freeRandomByte;
if (numAlivePaths > 0) {
int rf = r % numAlivePaths;
return _paths[alivePaths[rf]];
}
else if(numStalePaths > 0) {
// Resort to trying any non-expired path
int rf = r % numStalePaths;
return _paths[stalePaths[rf]];
}
}
/**
* Proportionally allocate traffic according to dynamic path quality measurements
*/
if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
_lastAggregateAllocation = now;
computeAggregateProportionalAllocation(now);
}
// Randomly choose path according to their allocations
float rf = _freeRandomByte;
for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
if (rf < _paths[i]->allocation()) {
bestPath = i;
_pathChoiceHist.push(bestPath); // Record which path we chose
break;
}
rf -= _paths[i]->allocation();
}
}
if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) {
return _paths[bestPath];
}
}
return SharedPtr<Path>();
}
char *Peer::interfaceListStr()
{
std::map<std::string, int> ifnamemap;
char tmp[32];
const int64_t now = RR->node->now();
char *ptr = _interfaceListStr;
bool imbalanced = false;
memset(_interfaceListStr, 0, sizeof(_interfaceListStr));
int alivePathCount = aggregateLinkLogicalPathCount();
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i] && _paths[i]->alive(now)) {
int ipv = _paths[i]->address().isV4();
// If this is acting as an aggregate link, check allocations
float targetAllocation = 1.0f / (float)alivePathCount;
float currentAllocation = 1.0f;
if (alivePathCount > 1) {
currentAllocation = (float)_pathChoiceHist.countValue(i) / (float)_pathChoiceHist.count();
if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) {
imbalanced = true;
}
}
char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6";
sprintf(tmp, "(%s, %s, %.3f)", _paths[i]->getName(), ipvStr, currentAllocation);
// Prevent duplicates
if(ifnamemap[_paths[i]->getName()] != ipv) {
memcpy(ptr, tmp, strlen(tmp));
ptr += strlen(tmp);
*ptr = ' ';
ptr++;
ifnamemap[_paths[i]->getName()] = ipv;
}
}
}
ptr--; // Overwrite trailing space
if (imbalanced) {
sprintf(tmp, ", is asymmetrical");
memcpy(ptr, tmp, sizeof(tmp));
} else {
*ptr = '\0';
}
return _interfaceListStr;
}
void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const
{
unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
unsigned int myBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
long myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
long myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
unsigned int theirBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
unsigned int theirBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
long theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
long theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
for(int i=0;i<=ZT_INETADDRESS_MAX_SCOPE;++i) {
myBestV4ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
myBestV6ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
myBestV4QualityByScope[i] = 2147483647;
myBestV6QualityByScope[i] = 2147483647;
theirBestV4ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
theirBestV6ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
theirBestV4QualityByScope[i] = 2147483647;
theirBestV6QualityByScope[i] = 2147483647;
}
Mutex::Lock _l1(_paths_m);
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
const long q = _paths[i]->quality(now);
const unsigned int s = (unsigned int)_paths[i]->ipScope();
switch(_paths[i]->address().ss_family) {
case AF_INET:
if (q <= myBestV4QualityByScope[s]) {
myBestV4QualityByScope[s] = q;
myBestV4ByScope[s] = i;
}
break;
case AF_INET6:
if (q <= myBestV6QualityByScope[s]) {
myBestV6QualityByScope[s] = q;
myBestV6ByScope[s] = i;
}
break;
}
} else break;
}
Mutex::Lock _l2(other->_paths_m);
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (other->_paths[i]) {
const long q = other->_paths[i]->quality(now);
const unsigned int s = (unsigned int)other->_paths[i]->ipScope();
switch(other->_paths[i]->address().ss_family) {
case AF_INET:
if (q <= theirBestV4QualityByScope[s]) {
theirBestV4QualityByScope[s] = q;
theirBestV4ByScope[s] = i;
}
break;
case AF_INET6:
if (q <= theirBestV6QualityByScope[s]) {
theirBestV6QualityByScope[s] = q;
theirBestV6ByScope[s] = i;
}
break;
}
} else break;
}
unsigned int mine = ZT_MAX_PEER_NETWORK_PATHS;
unsigned int theirs = ZT_MAX_PEER_NETWORK_PATHS;
for(int s=ZT_INETADDRESS_MAX_SCOPE;s>=0;--s) {
if ((myBestV6ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)&&(theirBestV6ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)) {
mine = myBestV6ByScope[s];
theirs = theirBestV6ByScope[s];
break;
}
if ((myBestV4ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)&&(theirBestV4ByScope[s] != ZT_MAX_PEER_NETWORK_PATHS)) {
mine = myBestV4ByScope[s];
theirs = theirBestV4ByScope[s];
break;
}
}
if (mine != ZT_MAX_PEER_NETWORK_PATHS) {
unsigned int alt = (unsigned int)Utils::random() & 1; // randomize which hint we send first for black magickal NAT-t reasons
const unsigned int completed = alt + 2;
while (alt != completed) {
if ((alt & 1) == 0) {
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
other->_id.address().appendTo(outp);
outp.append((uint16_t)other->_paths[theirs]->address().port());
if (other->_paths[theirs]->address().ss_family == AF_INET6) {
outp.append((uint8_t)16);
outp.append(other->_paths[theirs]->address().rawIpData(),16);
} else {
outp.append((uint8_t)4);
outp.append(other->_paths[theirs]->address().rawIpData(),4);
}
outp.armor(_key,true);
_paths[mine]->send(RR,tPtr,outp.data(),outp.size(),now);
} else {
Packet outp(other->_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
_id.address().appendTo(outp);
outp.append((uint16_t)_paths[mine]->address().port());
if (_paths[mine]->address().ss_family == AF_INET6) {
outp.append((uint8_t)16);
outp.append(_paths[mine]->address().rawIpData(),16);
} else {
outp.append((uint8_t)4);
outp.append(_paths[mine]->address().rawIpData(),4);
}
outp.armor(other->_key,true);
other->_paths[theirs]->send(RR,tPtr,outp.data(),outp.size(),now);
}
++alt;
}
}
}
inline void Peer::processBackgroundPeerTasks(const int64_t now)
{
// Determine current multipath compatibility with other peer
if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
//
// Cache number of available paths so that we can short-circuit multipath logic elsewhere
//
// We also take notice of duplicate paths (same IP only) because we may have
// recently received a direct path push from a peer and our list might contain
// a dead path which hasn't been fully recognized as such. In this case we
// don't want the duplicate to trigger execution of multipath code prematurely.
//
// This is done to support the behavior of auto multipath enable/disable
// without user intervention.
//
int currAlivePathCount = 0;
int duplicatePathsFound = 0;
for (unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
currAlivePathCount++;
for (unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
if (_paths[i] && _paths[j] && _paths[i]->address().ipsEqual2(_paths[j]->address()) && i != j) {
duplicatePathsFound+=1;
break;
}
}
}
}
_uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2));
_lastMultipathCompatibilityCheck = now;
_localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9));
_remoteMultipathSupported = _vProto > 9;
// If both peers support multipath and more than one path exist, we can use multipath logic
_canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1);
}
}
void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK);
uint32_t bytesToAck = path->bytesToAck();
outp.append<uint32_t>(bytesToAck);
if (atAddress) {
outp.armor(_key,false);
RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
} else {
RR->sw->send(tPtr,outp,false);
}
path->sentAck(now);
}
void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
const int64_t _now = RR->node->now();
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT);
char qosData[ZT_PATH_MAX_QOS_PACKET_SZ];
int16_t len = path->generateQoSPacket(_now,qosData);
outp.append(qosData,len);
if (atAddress) {
outp.armor(_key,false);
RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
} else {
RR->sw->send(tPtr,outp,false);
}
path->sentQoS(now);
// TODO
return false;
}
void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
@ -691,24 +219,9 @@ void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atA
void Peer::ping(void *tPtr,int64_t now,unsigned int &v4SendCount,unsigned int &v6SendCount)
{
/*
Mutex::Lock _l(_paths_m);
if (_canUseMultipath) {
int alivePathCount = aggregateLinkPhysicalPathCount();
if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) {
_lastAggregateStatsReport = now;
if (alivePathCount) {
RR->t->peerLinkAggregateStatistics(NULL,*this);
}
} if (alivePathCount < 2 && _linkIsRedundant) {
_linkIsRedundant = !_linkIsRedundant;
RR->t->peerLinkNoLongerRedundant(NULL,*this);
} if (alivePathCount > 1 && !_linkIsRedundant) {
_linkIsRedundant = !_linkIsRedundant;
RR->t->peerLinkNowRedundant(NULL,*this);
}
}
unsigned int j = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if ((_paths[i])&&(_paths[i]->alive(now))) {
@ -729,10 +242,12 @@ void Peer::ping(void *tPtr,int64_t now,unsigned int &v4SendCount,unsigned int &v
_paths[j].zero();
++j;
}
*/
}
void Peer::resetWithinScope(void *tPtr,InetAddress::IpScope scope,int inetAddressFamily,int64_t now)
{
/*
Mutex::Lock _l(_paths_m);
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
@ -742,6 +257,7 @@ void Peer::resetWithinScope(void *tPtr,InetAddress::IpScope scope,int inetAddres
}
} else break;
}
*/
}
} // namespace ZeroTier

View file

@ -14,8 +14,6 @@
#ifndef ZT_PEER_HPP
#define ZT_PEER_HPP
#include <vector>
#include "Constants.hpp"
#include "RuntimeEnvironment.hpp"
#include "Node.hpp"
@ -30,6 +28,8 @@
#include "Hashtable.hpp"
#include "Mutex.hpp"
#include <vector>
namespace ZeroTier {
/**
@ -40,7 +40,7 @@ class Peer
friend class SharedPtr<Peer>;
private:
inline Peer() {} // disabled to prevent bugs -- should not be constructed uninitialized
inline Peer() {}
public:
ZT_ALWAYS_INLINE ~Peer() { Utils::burn(_key,sizeof(_key)); }
@ -98,124 +98,7 @@ public:
* @param addr Remote address
* @return True if we have an active path to this destination
*/
inline bool hasActivePathTo(int64_t now,const InetAddress &addr) const
{
Mutex::Lock _l(_paths_m);
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i]) {
if ((_paths[i]->address() == addr)&&(_paths[i]->alive(now)))
return true;
} else break;
}
return false;
}
/**
* Send via best direct path
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param data Packet data
* @param len Packet length
* @param now Current time
* @param force If true, send even if path is not alive
* @return True if we actually sent something
*/
ZT_ALWAYS_INLINE bool sendDirect(void *tPtr,const void *data,unsigned int len,int64_t now,bool force)
{
SharedPtr<Path> bp(getAppropriatePath(now,force));
if (bp)
return bp->send(RR,tPtr,data,len,now);
return false;
}
/**
* Record statistics on outgoing packets
*
* @param path Path over which packet was sent
* @param id Packet ID
* @param len Length of packet payload
* @param verb Packet verb
* @param now Current time
*/
void recordOutgoingPacket(const SharedPtr<Path> &path, uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
/**
* Record statistics on incoming packets
*
* @param path Path over which packet was sent
* @param id Packet ID
* @param len Length of packet payload
* @param verb Packet verb
* @param now Current time
*/
void recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
/**
* Send an ACK to peer for the most recent packets received
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param localSocket Raw socket the ACK packet will be sent over
* @param atAddress Destination for the ACK packet
* @param now Current time
*/
void sendACK(void *tPtr, const SharedPtr<Path> &path, int64_t localSocket,const InetAddress &atAddress,int64_t now);
/**
* Send a QoS packet to peer so that it can evaluate the quality of this link
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param localSocket Raw socket the QoS packet will be sent over
* @param atAddress Destination for the QoS packet
* @param now Current time
*/
void sendQOS_MEASUREMENT(void *tPtr, const SharedPtr<Path> &path, int64_t localSocket,const InetAddress &atAddress,int64_t now);
/**
* Compute relative quality values and allocations for the components of the aggregate link
*
* @param now Current time
*/
void computeAggregateProportionalAllocation(int64_t now);
/**
* @return The aggregate link Packet Delay Variance (PDV)
*/
int computeAggregateLinkPacketDelayVariance();
/**
* @return The aggregate link mean latency
*/
int computeAggregateLinkMeanLatency();
/**
* @return The number of currently alive "physical" paths in the aggregate link
*/
int aggregateLinkPhysicalPathCount();
/**
* @return The number of currently alive "logical" paths in the aggregate link
*/
int aggregateLinkLogicalPathCount();
/**
* Get the most appropriate direct path based on current multipath and QoS configuration
*
* @param now Current time
* @param includeExpired If true, include even expired paths
* @return Best current path or NULL if none
*/
SharedPtr<Path> getAppropriatePath(int64_t now, bool includeExpired);
/**
* Generate a human-readable string of interface names making up the aggregate link, also include
* moving allocation and IP version number for each (for tracing)
*/
char *interfaceListStr();
/**
* Send VERB_RENDEZVOUS to this and another peer via the best common IP scope and path
*/
void introduce(void *tPtr,int64_t now,const SharedPtr<Peer> &other) const;
bool hasActivePathTo(int64_t now,const InetAddress &addr) const;
/**
* Send a HELLO to this peer at a specified physical address
@ -256,21 +139,6 @@ public:
*/
void resetWithinScope(void *tPtr,InetAddress::IpScope scope,int inetAddressFamily,int64_t now);
/**
* @param now Current time
* @return All known paths to this peer
*/
inline std::vector< SharedPtr<Path> > paths(const int64_t now) const
{
std::vector< SharedPtr<Path> > pp;
Mutex::Lock _l(_paths_m);
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (!_paths[i]) break;
pp.push_back(_paths[i]);
}
return pp;
}
/**
* @return Time of last receive of anything, whether direct or relayed
*/
@ -282,17 +150,24 @@ public:
ZT_ALWAYS_INLINE bool alive(const int64_t now) const { return ((now - _lastReceive) < ZT_PEER_ACTIVITY_TIMEOUT); }
/**
* @return Latency in milliseconds of best/aggregate path or 0xffff if unknown / no paths
* @return Latency in milliseconds of best/aggregate path or 0xffff if unknown
*/
ZT_ALWAYS_INLINE unsigned int latency(const int64_t now)
ZT_ALWAYS_INLINE unsigned int latency() const { return _latency; }
/**
* Update peer latency information
*
* This is called from packet parsing code.
*
* @param l New latency measurment (in milliseconds)
*/
ZT_ALWAYS_INLINE void updateLatency(const unsigned int l)
{
if (_canUseMultipath) {
return (int)computeAggregateLinkMeanLatency();
} else {
SharedPtr<Path> bp(getAppropriatePath(now,false));
if (bp)
return bp->latency();
return 0xffff;
if ((l > 0)&&(l < 0xffff)) {
unsigned int lat = _latency;
if (lat < 0xffff)
_latency = (l + lat) / 2;
else _latency = l;
}
}
@ -323,77 +198,10 @@ public:
ZT_ALWAYS_INLINE unsigned int remoteVersionRevision() const { return _vRevision; }
ZT_ALWAYS_INLINE bool remoteVersionKnown() const { return ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)); }
/**
* Periodically update known multipath activation constraints. This is done so that we know when and when
* not to use multipath logic. Doing this once every few seconds is sufficient.
*
* @param now Current time
*/
void processBackgroundPeerTasks(const int64_t now);
/**
* Record that the remote peer does have multipath enabled. As is evident by the receipt of a VERB_ACK
* or a VERB_QOS_MEASUREMENT packet at some point in the past. Until this flag is set, the local client
* shall assume that multipath is not enabled and should only use classical Protocol 9 logic.
*/
inline void inferRemoteMultipathEnabled() { _remotePeerMultipathEnabled = true; }
/**
* @return Whether the local client supports and is configured to use multipath
*/
inline bool localMultipathSupport() { return _localMultipathSupported; }
/**
* @return Whether the remote peer supports and is configured to use multipath
*/
inline bool remoteMultipathSupport() { return _remoteMultipathSupported; }
/**
* @return Whether this client can use multipath to communicate with this peer. True if both peers are using
* the correct protocol and if both peers have multipath enabled. False if otherwise.
*/
inline bool canUseMultipath() { return _canUseMultipath; }
/**
* Rate limit gate for VERB_PUSH_DIRECT_PATHS
*/
inline bool rateGatePushDirectPaths(const int64_t now)
{
if ((now - _lastDirectPathPushReceive) <= ZT_PUSH_DIRECT_PATHS_CUTOFF_TIME)
++_directPathPushCutoffCount;
else _directPathPushCutoffCount = 0;
_lastDirectPathPushReceive = now;
return (_directPathPushCutoffCount < ZT_PUSH_DIRECT_PATHS_CUTOFF_LIMIT);
}
/**
* Rate limit gate for VERB_NETWORK_CREDENTIALS
*/
inline bool rateGateCredentialsReceived(const int64_t now)
{
if ((now - _lastCredentialsReceived) <= ZT_PEER_CREDENTIALS_CUTOFF_TIME)
++_credentialsCutoffCount;
else _credentialsCutoffCount = 0;
_lastCredentialsReceived = now;
return (_directPathPushCutoffCount < ZT_PEER_CREDEITIALS_CUTOFF_LIMIT);
}
/**
* Rate limit gate for sending of ERROR_NEED_MEMBERSHIP_CERTIFICATE
*/
inline bool rateGateRequestCredentials(const int64_t now)
{
if ((now - _lastCredentialRequestSent) >= ZT_PEER_GENERAL_RATE_LIMIT) {
_lastCredentialRequestSent = now;
return true;
}
return false;
}
/**
* Rate limit gate for inbound WHOIS requests
*/
inline bool rateGateInboundWhoisRequest(const int64_t now)
ZT_ALWAYS_INLINE bool rateGateInboundWhoisRequest(const int64_t now)
{
if ((now - _lastWhoisRequestReceived) >= ZT_PEER_WHOIS_RATE_LIMIT) {
_lastWhoisRequestReceived = now;
@ -402,10 +210,22 @@ public:
return false;
}
/**
* Rate limit gate for inbound PUSH_DIRECT_PATHS requests
*/
ZT_ALWAYS_INLINE bool rateGateInboundPushDirectPaths(const int64_t now)
{
if ((now - _lastPushDirectPathsReceived) >= ZT_DIRECT_PATH_PUSH_INTERVAL) {
_lastPushDirectPathsReceived = now;
return true;
}
return false;
}
/**
* Rate limit gate for inbound ECHO requests
*/
inline bool rateGateEchoRequest(const int64_t now)
ZT_ALWAYS_INLINE bool rateGateEchoRequest(const int64_t now)
{
if ((now - _lastEchoRequestReceived) >= ZT_PEER_GENERAL_RATE_LIMIT) {
_lastEchoRequestReceived = now;
@ -414,38 +234,10 @@ public:
return false;
}
/**
* Rate limit gate for VERB_ACK
*/
inline bool rateGateACK(const int64_t now)
{
if ((now - _lastACKWindowReset) >= ZT_PATH_QOS_ACK_CUTOFF_TIME) {
_lastACKWindowReset = now;
_ACKCutoffCount = 0;
} else {
++_ACKCutoffCount;
}
return (_ACKCutoffCount < ZT_PATH_QOS_ACK_CUTOFF_LIMIT);
}
/**
* Rate limit gate for VERB_QOS_MEASUREMENT
*/
inline bool rateGateQoS(const int64_t now)
{
if ((now - _lastQoSWindowReset) >= ZT_PATH_QOS_ACK_CUTOFF_TIME) {
_lastQoSWindowReset = now;
_QoSCutoffCount = 0;
} else {
++_QoSCutoffCount;
}
return (_QoSCutoffCount < ZT_PATH_QOS_ACK_CUTOFF_LIMIT);
}
/**
* Rate limit gate for trying externally defined or static path
*/
inline bool rateGateTryStaticPath(const int64_t now)
ZT_ALWAYS_INLINE bool rateGateTryStaticPath(const int64_t now)
{
if ((now - _lastTriedStaticPath) >= ZT_PEER_PING_PERIOD) {
_lastTriedStaticPath = now;
@ -455,11 +247,35 @@ public:
}
/**
* @return Whether this peer is reachable via an aggregate link
* Send directly if a direct path exists
*
* @param tPtr Thread pointer supplied by user
* @param data Data to send
* @param len Length of data
* @param now Current time
* @return True if packet appears to have been sent, false if no path or send failed
*/
inline bool hasAggregateLink() const
ZT_ALWAYS_INLINE bool sendDirect(void *tPtr,const void *data,const unsigned int len,const int64_t now)
{
return _localMultipathSupported && _remoteMultipathSupported && _remotePeerMultipathEnabled;
_paths_l.rlock();
if (_pathCount == 0) {
_paths_l.unlock();
return false;
}
const bool r = _paths[0]->send(RR,tPtr,data,len,now);
_paths_l.unlock();
return r;
}
/**
* @return Current best path
*/
ZT_ALWAYS_INLINE SharedPtr<Path> path()
{
RWMutex::RLock l(_paths_l);
if (_pathCount == 0)
return SharedPtr<Path>();
return _paths[0];
}
private:
@ -467,60 +283,27 @@ private:
const RuntimeEnvironment *RR;
int64_t _lastReceive; // direct or indirect
int64_t _lastDirectPathPushSent;
int64_t _lastDirectPathPushReceive;
int64_t _lastCredentialRequestSent;
int64_t _lastReceive;
int64_t _lastWhoisRequestReceived;
int64_t _lastEchoRequestReceived;
int64_t _lastCredentialsReceived;
int64_t _lastACKWindowReset;
int64_t _lastQoSWindowReset;
int64_t _lastMultipathCompatibilityCheck;
int64_t _lastPushDirectPathsReceived;
int64_t _lastTriedStaticPath;
unsigned int _latency;
int _uniqueAlivePathCount;
AtomicCounter __refCount;
bool _localMultipathSupported;
bool _remoteMultipathSupported;
bool _canUseMultipath;
uint8_t _freeRandomByte;
unsigned int _pathCount;
SharedPtr<Path> _paths[ZT_MAX_PEER_NETWORK_PATHS];
RWMutex _paths_l;
Identity _id;
uint16_t _vProto;
uint16_t _vMajor;
uint16_t _vMinor;
uint16_t _vRevision;
SharedPtr<Path> _paths[ZT_MAX_PEER_NETWORK_PATHS];
Mutex _paths_m;
Identity _id;
unsigned int _directPathPushCutoffCount;
unsigned int _credentialsCutoffCount;
unsigned int _QoSCutoffCount;
unsigned int _ACKCutoffCount;
RingBuffer<int,ZT_MULTIPATH_PROPORTION_WIN_SZ> _pathChoiceHist;
bool _linkIsBalanced;
bool _linkIsRedundant;
bool _remotePeerMultipathEnabled;
int64_t _lastAggregateStatsReport;
int64_t _lastAggregateAllocation;
char _interfaceListStr[256]; // 16 characters * 16 paths in a link
AtomicCounter __refCount;
};
} // namespace ZeroTier
// Add a swap() for shared ptr's to peers to speed up peer sorts
namespace std {
template<>
inline void swap(ZeroTier::SharedPtr<ZeroTier::Peer> &a,ZeroTier::SharedPtr<ZeroTier::Peer> &b) { a.swap(b); }
}
#endif

View file

@ -35,6 +35,13 @@ public:
explicit ZT_ALWAYS_INLINE operator bool() const { return (_p != (T *)0); }
ZT_ALWAYS_INLINE T *ptr() const { return _p; }
ZT_ALWAYS_INLINE void swap(const ScopedPtr &p)
{
T *const tmp = _p;
_p = p._p;
p._p = tmp;
}
ZT_ALWAYS_INLINE bool operator==(const ScopedPtr &p) const { return (_p == p._p); }
ZT_ALWAYS_INLINE bool operator!=(const ScopedPtr &p) const { return (_p != p._p); }
ZT_ALWAYS_INLINE bool operator==(T *const p) const { return (_p == p); }
@ -49,4 +56,9 @@ private:
} // namespace ZeroTier
namespace std {
template<typename T>
ZT_ALWAYS_INLINE void swap(ZeroTier::ScopedPtr<T> &a,ZeroTier::ScopedPtr<T> &b) { a.swap(b); }
}
#endif

View file

@ -131,4 +131,9 @@ private:
} // namespace ZeroTier
namespace std {
template<typename T>
ZT_ALWAYS_INLINE void swap(ZeroTier::SharedPtr<T> &a,ZeroTier::SharedPtr<T> &b) { a.swap(b); }
}
#endif

View file

@ -35,8 +35,7 @@ namespace ZeroTier {
Switch::Switch(const RuntimeEnvironment *renv) :
RR(renv),
_lastCheckedQueues(0),
_lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
_lastCheckedQueues(0)
{
}
@ -58,15 +57,11 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
if (destination != RR->identity.address()) {
if (fragment.hops() < ZT_RELAY_MAX_HOPS) {
fragment.incrementHops();
// Note: we don't bother initiating NAT-t for fragments, since heads will set that off.
// It wouldn't hurt anything, just redundant and unnecessary.
SharedPtr<Peer> relayTo = RR->topology->get(destination);
if ((!relayTo)||(!relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,false))) {
// Don't know peer or no direct path -- so relay via someone upstream
if ((!relayTo)||(!relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now))) {
relayTo = RR->topology->findRelayTo(now,destination);
if (relayTo)
relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,true);
relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now);
}
}
} else {
@ -131,21 +126,10 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
if (packet.hops() < ZT_RELAY_MAX_HOPS) {
packet.incrementHops();
SharedPtr<Peer> relayTo = RR->topology->get(destination);
if ((relayTo)&&(relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,false))) {
if ((source != RR->identity.address())&&(_shouldUnite(now,source,destination))) {
const SharedPtr<Peer> sourcePeer(RR->topology->get(source));
if (sourcePeer)
relayTo->introduce(tPtr,now,sourcePeer);
}
} else {
if ((!relayTo)||(!relayTo->sendDirect(tPtr,packet.data(),packet.size(),now))) {
relayTo = RR->topology->findRelayTo(now,destination);
if ((relayTo)&&(relayTo->address() != source)) {
if (relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true)) {
const SharedPtr<Peer> sourcePeer(RR->topology->get(source));
if (sourcePeer)
relayTo->introduce(tPtr,now,sourcePeer);
}
}
if ((relayTo)&&(relayTo->address() != source))
relayTo->sendDirect(tPtr,packet.data(),packet.size(),now);
}
}
@ -153,13 +137,13 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
// Packet is the head of a fragmented packet series ----------------
const uint64_t packetId = (
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[0]) << 56) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[1]) << 48) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[2]) << 40) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[3]) << 32) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[4]) << 24) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[5]) << 16) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[6]) << 8) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[0]) << 56U) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[1]) << 48U) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[2]) << 40U) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[3]) << 32U) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[4]) << 24U) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[5]) << 16U) |
(((uint64_t)reinterpret_cast<const uint8_t *>(data)[6]) << 8U) |
((uint64_t)reinterpret_cast<const uint8_t *>(data)[7])
);
@ -234,7 +218,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
uint8_t qosBucket = 0;
if (to.isMulticast()) {
MulticastGroup multicastGroup(to,0);
@ -287,8 +271,8 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
break;
}
} else if (sipNetmaskBits == 40) { // ZT-6PLANE /40 ???
const uint32_t nwid32 = (uint32_t)((network->id() ^ (network->id() >> 32)) & 0xffffffff);
if ( (my6[0] == 0xfc) && (my6[1] == (uint8_t)((nwid32 >> 24) & 0xff)) && (my6[2] == (uint8_t)((nwid32 >> 16) & 0xff)) && (my6[3] == (uint8_t)((nwid32 >> 8) & 0xff)) && (my6[4] == (uint8_t)(nwid32 & 0xff))) {
const uint32_t nwid32 = (uint32_t)((network->id() ^ (network->id() >> 32U)) & 0xffffffffU);
if ( (my6[0] == 0xfc) && (my6[1] == (uint8_t)((nwid32 >> 24U) & 0xffU)) && (my6[2] == (uint8_t)((nwid32 >> 16U) & 0xffU)) && (my6[3] == (uint8_t)((nwid32 >> 8U) & 0xffU)) && (my6[4] == (uint8_t)(nwid32 & 0xffU))) {
unsigned int ptr = 0;
while (ptr != 5) {
if (pkt6[ptr] != my6[ptr])
@ -328,10 +312,10 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
for(int i=0;i<32;++i) pseudo[40 + i] = adv[40 + i];
uint32_t checksum = 0;
for(int i=0;i<36;++i) checksum += Utils::hton(pseudo_[i]);
while ((checksum >> 16)) checksum = (checksum & 0xffff) + (checksum >> 16);
while ((checksum >> 16U)) checksum = (checksum & 0xffffU) + (checksum >> 16U);
checksum = ~checksum;
adv[42] = (checksum >> 8) & 0xff;
adv[43] = checksum & 0xff;
adv[42] = (checksum >> 8U) & 0xffU;
adv[43] = checksum & 0xffU;
RR->node->putFrame(tPtr,network->id(),network->userPtr(),peerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72);
return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query.
@ -397,17 +381,11 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
from.appendTo(outp);
outp.append((uint16_t)etherType);
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
outp.append(network->id());
outp.append((uint16_t)etherType);
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
aqm_enqueue(tPtr,network,outp,true,qosBucket);
}
} else {
// Destination is bridged behind a remote peer ---------------------------
@ -465,9 +443,6 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
from.appendTo(outp);
outp.append((uint16_t)etherType);
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)");
}
@ -475,263 +450,6 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket)
{
if(!network->qosEnabled()) {
send(tPtr, packet, encrypt);
return;
}
NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
if (!nqcb) {
// DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id());
nqcb = new NetworkQoSControlBlock();
_netQueueControlBlock[network->id()] = nqcb;
// Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list
// These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm
for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
nqcb->inactiveQueues.push_back(new ManagedQueue(i));
}
}
if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
// DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb());
// just send packet normally, no QoS for ZT protocol traffic
send(tPtr, packet, encrypt);
}
_aqm_m.lock();
// Enqueue packet and move queue to appropriate list
const Address dest(packet.destination());
TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt);
ManagedQueue *selectedQueue = nullptr;
for (size_t i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue)
if (nqcb->oldQueues[i]->id == qosBucket) {
selectedQueue = nqcb->oldQueues[i];
}
} if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues)
if (nqcb->newQueues[i]->id == qosBucket) {
selectedQueue = nqcb->newQueues[i];
}
} if (i < nqcb->inactiveQueues.size()) { // search inactive queues
if (nqcb->inactiveQueues[i]->id == qosBucket) {
selectedQueue = nqcb->inactiveQueues[i];
// move queue to end of NEW queue list
selectedQueue->byteCredit = ZT_QOS_QUANTUM;
// DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue);
nqcb->newQueues.push_back(selectedQueue);
nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i);
}
}
}
if (!selectedQueue) {
return;
}
selectedQueue->q.push_back(txEntry);
selectedQueue->byteLength+=txEntry->packet.payloadLength();
nqcb->_currEnqueuedPackets++;
// DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue);
// Drop a packet if necessary
ManagedQueue *selectedQueueToDropFrom = nullptr;
if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS)
{
// DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets);
int maxQueueLength = 0;
for (size_t i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
if (i < nqcb->oldQueues.size()) {
if (nqcb->oldQueues[i]->byteLength > maxQueueLength) {
maxQueueLength = nqcb->oldQueues[i]->byteLength;
selectedQueueToDropFrom = nqcb->oldQueues[i];
}
} if (i < nqcb->newQueues.size()) {
if (nqcb->newQueues[i]->byteLength > maxQueueLength) {
maxQueueLength = nqcb->newQueues[i]->byteLength;
selectedQueueToDropFrom = nqcb->newQueues[i];
}
} if (i < nqcb->inactiveQueues.size()) {
if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) {
maxQueueLength = nqcb->inactiveQueues[i]->byteLength;
selectedQueueToDropFrom = nqcb->inactiveQueues[i];
}
}
}
if (selectedQueueToDropFrom) {
// DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength);
int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength();
delete selectedQueueToDropFrom->q.front();
selectedQueueToDropFrom->q.pop_front();
selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket;
nqcb->_currEnqueuedPackets--;
}
}
_aqm_m.unlock();
aqm_dequeue(tPtr);
}
uint64_t Switch::control_law(uint64_t t, int count)
{
return (uint64_t)(t + ZT_QOS_INTERVAL / sqrt(count));
}
Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now)
{
dqr r;
r.ok_to_drop = false;
r.p = q->q.front();
if (r.p == NULL) {
q->first_above_time = 0;
return r;
}
uint64_t sojourn_time = now - r.p->creationTime;
if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
// went below - stay below for at least interval
q->first_above_time = 0;
} else {
if (q->first_above_time == 0) {
// just went above from below. if still above at
// first_above_time, will say it's ok to drop.
q->first_above_time = now + ZT_QOS_INTERVAL;
} else if (now >= q->first_above_time) {
r.ok_to_drop = true;
}
}
return r;
}
Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now)
{
dqr r = dodequeue(q, now);
if (q->dropping) {
if (!r.ok_to_drop) {
q->dropping = false;
}
while (now >= q->drop_next && q->dropping) {
q->q.pop_front(); // drop
r = dodequeue(q, now);
if (!r.ok_to_drop) {
// leave dropping state
q->dropping = false;
} else {
++(q->count);
// schedule the next drop.
q->drop_next = control_law(q->drop_next, q->count);
}
}
} else if (r.ok_to_drop) {
q->q.pop_front(); // drop
r = dodequeue(q, now);
q->dropping = true;
q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)?
q->count - 2 : 1;
q->drop_next = control_law(now, q->count);
}
return r.p;
}
void Switch::aqm_dequeue(void *tPtr)
{
// Cycle through network-specific QoS control blocks
for(std::map<uint64_t,NetworkQoSControlBlock*>::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) {
if (!(*nqcb).second->_currEnqueuedPackets) {
return;
}
uint64_t now = RR->node->now();
TXQueueEntry *entryToEmit = nullptr;
std::vector<ManagedQueue*> *currQueues = &((*nqcb).second->newQueues);
std::vector<ManagedQueue*> *oldQueues = &((*nqcb).second->oldQueues);
std::vector<ManagedQueue*> *inactiveQueues = &((*nqcb).second->inactiveQueues);
_aqm_m.lock();
// Attempt dequeue from queues in NEW list
bool examiningNewQueues = true;
while (currQueues->size()) {
ManagedQueue *queueAtFrontOfList = currQueues->front();
if (queueAtFrontOfList->byteCredit < 0) {
queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
// Move to list of OLD queues
// DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
oldQueues->push_back(queueAtFrontOfList);
currQueues->erase(currQueues->begin());
} else {
entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
if (!entryToEmit) {
// Move to end of list of OLD queues
// DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
oldQueues->push_back(queueAtFrontOfList);
currQueues->erase(currQueues->begin());
}
else {
int len = entryToEmit->packet.payloadLength();
queueAtFrontOfList->byteLength -= len;
queueAtFrontOfList->byteCredit -= len;
// Send the packet!
queueAtFrontOfList->q.pop_front();
send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
(*nqcb).second->_currEnqueuedPackets--;
}
if (queueAtFrontOfList) {
//DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
}
break;
}
}
// Attempt dequeue from queues in OLD list
examiningNewQueues = false;
currQueues = &((*nqcb).second->oldQueues);
while (currQueues->size()) {
ManagedQueue *queueAtFrontOfList = currQueues->front();
if (queueAtFrontOfList->byteCredit < 0) {
queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
oldQueues->push_back(queueAtFrontOfList);
currQueues->erase(currQueues->begin());
} else {
entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
if (!entryToEmit) {
//DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList);
// Move to inactive list of queues
inactiveQueues->push_back(queueAtFrontOfList);
currQueues->erase(currQueues->begin());
}
else {
int len = entryToEmit->packet.payloadLength();
queueAtFrontOfList->byteLength -= len;
queueAtFrontOfList->byteCredit -= len;
queueAtFrontOfList->q.pop_front();
send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
(*nqcb).second->_currEnqueuedPackets--;
}
if (queueAtFrontOfList) {
//DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
}
break;
}
}
nqcb++;
_aqm_m.unlock();
}
}
void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
{
NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid];
if (nq) {
_netQueueControlBlock.erase(nwid);
delete nq;
nq = NULL;
}
}
void Switch::send(void *tPtr,Packet &packet,bool encrypt)
{
const Address dest(packet.destination());
@ -763,12 +481,12 @@ void Switch::requestWhois(void *tPtr,const int64_t now,const Address &addr)
else last = now;
}
const SharedPtr<Peer> root(RR->topology->root(now));
const SharedPtr<Peer> root(RR->topology->root());
if (root) {
Packet outp(root->address(),RR->identity.address(),Packet::VERB_WHOIS);
addr.appendTo(outp);
RR->node->expectReplyTo(outp.packetId());
root->sendDirect(tPtr,outp.data(),outp.size(),now,true);
root->sendDirect(tPtr,outp.data(),outp.size(),now);
}
}
@ -845,17 +563,6 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
}
}
{
Mutex::Lock _l(_lastUniteAttempt_m);
Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
_LastUniteKey *k = (_LastUniteKey *)0;
uint64_t *v = (uint64_t *)0;
while (i.next(k,v)) {
if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 8))
_lastUniteAttempt.erase(*k);
}
}
{
Mutex::Lock _l(_lastSentWhoisRequest_m);
Hashtable< Address,int64_t >::Iterator i(_lastSentWhoisRequest);
@ -870,17 +577,6 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
return ZT_WHOIS_RETRY_DELAY;
}
bool Switch::_shouldUnite(const int64_t now,const Address &source,const Address &destination)
{
Mutex::Lock _l(_lastUniteAttempt_m);
uint64_t &ts = _lastUniteAttempt[_LastUniteKey(source,destination)];
if ((now - ts) >= ZT_MIN_UNITE_INTERVAL) {
ts = now;
return true;
}
return false;
}
bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
{
SharedPtr<Path> viaPath;
@ -889,7 +585,7 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
const SharedPtr<Peer> peer(RR->topology->get(destination));
if (peer) {
viaPath = peer->getAppropriatePath(now,false);
viaPath = peer->path();
if (!viaPath) {
if (peer->rateGateTryStaticPath(now)) {
InetAddress tryAddr;
@ -905,7 +601,7 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
const SharedPtr<Peer> relay(RR->topology->findRelayTo(now,destination));
if (relay) {
viaPath = relay->getAppropriatePath(now,true);
viaPath = relay->path();
if (!viaPath)
return false;
}
@ -923,8 +619,6 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
unsigned int chunkSize = std::min(packet.size(),mtu);
packet.setFragmented(chunkSize < packet.size());
peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
if (trustedPathId) {
packet.setTrusted(trustedPathId);
} else {

View file

@ -46,16 +46,8 @@ class Peer;
*/
class Switch
{
struct ManagedQueue;
struct TXQueueEntry;
typedef struct {
TXQueueEntry *p;
bool ok_to_drop;
} dqr;
public:
Switch(const RuntimeEnvironment *renv);
explicit Switch(const RuntimeEnvironment *renv);
/**
* Called when a packet is received from the real network
@ -82,62 +74,6 @@ public:
*/
void onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
/**
* Determines the next drop schedule for packets in the TX queue
*
* @param t Current time
* @param count Number of packets dropped this round
*/
uint64_t control_law(uint64_t t, int count);
/**
* Selects a packet eligible for transmission from a TX queue. According to the control law, multiple packets
* may be intentionally dropped before a packet is returned to the AQM scheduler.
*
* @param q The TX queue that is being dequeued from
* @param now Current time
*/
dqr dodequeue(ManagedQueue *q, uint64_t now);
/**
* Presents a packet to the AQM scheduler.
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param network Network that the packet shall be sent over
* @param packet Packet to be sent
* @param encrypt Encrypt packet payload? (always true except for HELLO)
* @param qosBucket Which bucket the rule-system determined this packet should fall into
*/
void aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket);
/**
* Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
*/
void aqm_dequeue(void *tPtr);
/**
* Calls the dequeue mechanism and adjust queue state variables
*
* @param q The TX queue that is being dequeued from
* @param isNew Whether or not this queue is in the NEW list
* @param now Current time
*/
Switch::TXQueueEntry * CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now);
/**
* Removes QoS Queues and flow state variables for a specific network. These queues are created
* automatically upon the transmission of the first packet from this peer to another peer on the
* given network.
*
* The reason for existence of queues and flow state variables specific to each network is so that
* each network's QoS rules function independently.
*
* @param nwid Network ID
*/
void removeNetworkQoSControlBlock(uint64_t nwid);
/**
* Send a packet to a ZeroTier address (destination in packet)
*
@ -163,7 +99,7 @@ public:
* @param now Current time
* @param addr Address to look up
*/
void requestWhois(void *tPtr,const int64_t now,const Address &addr);
void requestWhois(void *tPtr,int64_t now,const Address &addr);
/**
* Run any processes that are waiting for this peer's identity
@ -188,7 +124,6 @@ public:
unsigned long doTimerTasks(void *tPtr,int64_t now);
private:
bool _shouldUnite(const int64_t now,const Address &source,const Address &destination);
bool _trySend(void *tPtr,Packet &packet,bool encrypt); // packet is modified if return is true
const RuntimeEnvironment *const RR;
@ -201,7 +136,7 @@ private:
// Packets waiting for WHOIS replies or other decode info or missing fragments
struct RXQueueEntry
{
RXQueueEntry() : timestamp(0) {}
ZT_ALWAYS_INLINE RXQueueEntry() : timestamp(0) {}
volatile int64_t timestamp; // 0 if entry is not in use
volatile uint64_t packetId;
IncomingPacket frag0; // head of packet
@ -236,8 +171,8 @@ private:
// ZeroTier-layer TX queue entry
struct TXQueueEntry
{
TXQueueEntry() {}
TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc) :
ZT_ALWAYS_INLINE TXQueueEntry() {}
ZT_ALWAYS_INLINE TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc) :
dest(d),
creationTime(ct),
packet(p),
@ -250,58 +185,6 @@ private:
};
std::list< TXQueueEntry > _txQueue;
Mutex _txQueue_m;
Mutex _aqm_m;
// Tracks sending of VERB_RENDEZVOUS to relaying peers
struct _LastUniteKey
{
_LastUniteKey() : x(0),y(0) {}
_LastUniteKey(const Address &a1,const Address &a2)
{
if (a1 > a2) {
x = a2.toInt();
y = a1.toInt();
} else {
x = a1.toInt();
y = a2.toInt();
}
}
inline unsigned long hashCode() const { return ((unsigned long)x ^ (unsigned long)y); }
inline bool operator==(const _LastUniteKey &k) const { return ((x == k.x)&&(y == k.y)); }
inline bool operator!=(const _LastUniteKey &k) const { return ((x != k.x)||(y != k.y)); }
uint64_t x,y;
};
Hashtable< _LastUniteKey,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior
Mutex _lastUniteAttempt_m;
// Queue with additional flow state variables
struct ManagedQueue
{
ManagedQueue(int id) :
id(id),
byteCredit(ZT_QOS_QUANTUM),
byteLength(0),
dropping(false)
{}
int id;
int byteCredit;
int byteLength;
uint64_t first_above_time;
uint32_t count;
uint64_t drop_next;
bool dropping;
uint64_t drop_next_time;
std::list< TXQueueEntry *> q;
};
// To implement fq_codel we need to maintain a queue of queues
struct NetworkQoSControlBlock
{
int _currEnqueuedPackets;
std::vector<ManagedQueue *> newQueues;
std::vector<ManagedQueue *> oldQueues;
std::vector<ManagedQueue *> inactiveQueues;
};
std::map<uint64_t,NetworkQoSControlBlock*> _netQueueControlBlock;
};
} // namespace ZeroTier

153
node/Topology.cpp Normal file
View file

@ -0,0 +1,153 @@
/*
* Copyright (c)2019 ZeroTier, Inc.
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
*
* Change Date: 2023-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
/****/
#include "Topology.hpp"
namespace ZeroTier {
struct _RootSortComparisonOperator
{
ZT_ALWAYS_INLINE _RootSortComparisonOperator(const int64_t now) : _now(now) {}
ZT_ALWAYS_INLINE bool operator()(const SharedPtr<Peer> &a,const SharedPtr<Peer> &b)
{
const int64_t now = _now;
if (a->alive(now)) {
if (b->alive(now))
return (a->latency() < b->latency());
return true;
}
return false;
}
const int64_t _now;
};
Topology::Topology(const RuntimeEnvironment *renv,const Identity &myId) :
RR(renv),
_myIdentity(myId),
_numConfiguredPhysicalPaths(0),
_peers(128),
_paths(256)
{
}
Topology::~Topology()
{
}
void Topology::getAllPeers(std::vector< SharedPtr<Peer> > &allPeers) const
{
RWMutex::RLock l(_peers_l);
allPeers.clear();
allPeers.reserve(_peers.size());
Hashtable< Address,SharedPtr<Peer> >::Iterator i(*(const_cast<Hashtable< Address,SharedPtr<Peer> > *>(&_peers)));
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p))
allPeers.push_back(*p);
}
void Topology::setPhysicalPathConfiguration(const struct sockaddr_storage *pathNetwork,const ZT_PhysicalPathConfiguration *pathConfig)
{
if (!pathNetwork) {
_numConfiguredPhysicalPaths = 0;
} else {
std::map<InetAddress,ZT_PhysicalPathConfiguration> cpaths;
for(unsigned int i=0,j=_numConfiguredPhysicalPaths;i<j;++i)
cpaths[_physicalPathConfig[i].first] = _physicalPathConfig[i].second;
if (pathConfig) {
ZT_PhysicalPathConfiguration pc(*pathConfig);
if (pc.mtu <= 0)
pc.mtu = ZT_DEFAULT_PHYSMTU;
else if (pc.mtu < ZT_MIN_PHYSMTU)
pc.mtu = ZT_MIN_PHYSMTU;
else if (pc.mtu > ZT_MAX_PHYSMTU)
pc.mtu = ZT_MAX_PHYSMTU;
cpaths[*(reinterpret_cast<const InetAddress *>(pathNetwork))] = pc;
} else {
cpaths.erase(*(reinterpret_cast<const InetAddress *>(pathNetwork)));
}
unsigned int cnt = 0;
for(std::map<InetAddress,ZT_PhysicalPathConfiguration>::const_iterator i(cpaths.begin());((i!=cpaths.end())&&(cnt<ZT_MAX_CONFIGURABLE_PATHS));++i) {
_physicalPathConfig[cnt].first = i->first;
_physicalPathConfig[cnt].second = i->second;
++cnt;
}
_numConfiguredPhysicalPaths = cnt;
}
}
void Topology::addRoot(const Identity &id)
{
if (id == _myIdentity) return; // sanity check
RWMutex::Lock l1(_peers_l);
std::pair< std::set<Identity>::iterator,bool > ir(_roots.insert(id));
if (ir.second) {
SharedPtr<Peer> &p = _peers[id.address()];
if (!p)
p.set(new Peer(RR,_myIdentity,id));
_rootPeers.push_back(p);
}
}
bool Topology::removeRoot(const Identity &id)
{
RWMutex::Lock l1(_peers_l);
std::set<Identity>::iterator r(_roots.find(id));
if (r != _roots.end()) {
for(std::vector< SharedPtr<Peer> >::iterator p(_rootPeers.begin());p!=_rootPeers.end();++p) {
if ((*p)->identity() == id) {
_rootPeers.erase(p);
break;
}
}
_roots.erase(r);
return true;
}
return false;
}
void Topology::rankRoots(const int64_t now)
{
RWMutex::Lock l1(_peers_l);
std::sort(_rootPeers.begin(),_rootPeers.end(),_RootSortComparisonOperator(now));
}
void Topology::doPeriodicTasks(const int64_t now)
{
{
RWMutex::Lock l1(_peers_l);
Hashtable< Address,SharedPtr<Peer> >::Iterator i(_peers);
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p)) {
if ( (!(*p)->alive(now)) && (_roots.count((*p)->identity()) == 0) )
_peers.erase(*a);
}
}
{
RWMutex::Lock l1(_paths_l);
Hashtable< Path::HashKey,SharedPtr<Path> >::Iterator i(_paths);
Path::HashKey *k = (Path::HashKey *)0;
SharedPtr<Path> *p = (SharedPtr<Path> *)0;
while (i.next(k,p)) {
if (p->references() <= 1)
_paths.erase(*k);
}
}
}
} // namespace ZeroTier

View file

@ -34,7 +34,6 @@
#include "Hashtable.hpp"
#include "SharedPtr.hpp"
#include "ScopedPtr.hpp"
#include "Str.hpp"
namespace ZeroTier {
@ -46,14 +45,8 @@ class RuntimeEnvironment;
class Topology
{
public:
ZT_ALWAYS_INLINE Topology(const RuntimeEnvironment *renv,const Identity &myId) :
RR(renv),
_myIdentity(myId),
_numConfiguredPhysicalPaths(0),
_peers(128),
_paths(256)
{
}
Topology(const RuntimeEnvironment *renv,const Identity &myId);
~Topology();
/**
* Add a peer to database
@ -66,7 +59,7 @@ public:
*/
ZT_ALWAYS_INLINE SharedPtr<Peer> add(const SharedPtr<Peer> &peer)
{
Mutex::Lock _l(_peers_l);
RWMutex::Lock _l(_peers_l);
SharedPtr<Peer> &hp = _peers[peer->address()];
if (!hp)
hp = peer;
@ -80,9 +73,9 @@ public:
* @param zta ZeroTier address of peer
* @return Peer or NULL if not found
*/
ZT_ALWAYS_INLINE SharedPtr<Peer> get(const Address &zta)
ZT_ALWAYS_INLINE SharedPtr<Peer> get(const Address &zta) const
{
Mutex::Lock l1(_peers_l);
RWMutex::RLock l1(_peers_l);
const SharedPtr<Peer> *const ap = _peers.get(zta);
return (ap) ? *ap : SharedPtr<Peer>();
}
@ -92,12 +85,12 @@ public:
* @param zta ZeroTier address of peer
* @return Identity or NULL identity if not found
*/
ZT_ALWAYS_INLINE Identity getIdentity(void *tPtr,const Address &zta)
ZT_ALWAYS_INLINE Identity getIdentity(void *tPtr,const Address &zta) const
{
if (zta == _myIdentity.address()) {
return _myIdentity;
} else {
Mutex::Lock _l(_peers_l);
RWMutex::RLock _l(_peers_l);
const SharedPtr<Peer> *const ap = _peers.get(zta);
if (ap)
return (*ap)->identity();
@ -110,45 +103,57 @@ public:
*
* @param l Local socket
* @param r Remote address
* @return Pointer to canonicalized Path object
* @return Pointer to canonicalized Path object or NULL on error
*/
ZT_ALWAYS_INLINE SharedPtr<Path> getPath(const int64_t l,const InetAddress &r)
{
Mutex::Lock _l(_paths_l);
SharedPtr<Path> &p = _paths[Path::HashKey(l,r)];
if (!p)
p.set(new Path(l,r));
const Path::HashKey k(l,r);
_paths_l.rlock();
SharedPtr<Path> p(_paths[k]);
_paths_l.unlock();
if (p)
return p;
_paths_l.lock();
SharedPtr<Path> &p2 = _paths[k];
if (p2) {
p = p2;
} else {
try {
p.set(new Path(l,r));
} catch ( ... ) {
_paths_l.unlock();
return SharedPtr<Path>();
}
p2 = p;
}
_paths_l.unlock();
return p;
}
/**
* @return Current best root server
*/
ZT_ALWAYS_INLINE SharedPtr<Peer> root() const
{
RWMutex::RLock l(_peers_l);
if (_rootPeers.empty())
return SharedPtr<Peer>();
return _rootPeers.front();
}
/**
* @param id Identity to check
* @return True if this identity corresponds to a root
*/
ZT_ALWAYS_INLINE bool isRoot(const Identity &id) const
{
Mutex::Lock l(_peers_l);
RWMutex::RLock l(_peers_l);
return (_roots.count(id) > 0);
}
/**
* @param now Current time
* @return Number of peers with active direct paths
*/
ZT_ALWAYS_INLINE unsigned long countActive(const int64_t now) const
{
unsigned long cnt = 0;
Mutex::Lock _l(_peers_l);
Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers);
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p)) {
if ((*p)->getAppropriatePath(now,false))
++cnt;
}
return cnt;
}
/**
* Apply a function or function object to all peers
*
@ -159,10 +164,10 @@ public:
* @tparam F Function or function object type
*/
template<typename F>
ZT_ALWAYS_INLINE void eachPeer(F f)
ZT_ALWAYS_INLINE void eachPeer(F f) const
{
Mutex::Lock l(_peers_l);
Hashtable< Address,SharedPtr<Peer> >::Iterator i(_peers);
RWMutex::RLock l(_peers_l);
Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers);
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p)) {
@ -181,16 +186,16 @@ public:
* @tparam F Function or function object type
*/
template<typename F>
ZT_ALWAYS_INLINE void eachPeerWithRoot(F f)
ZT_ALWAYS_INLINE void eachPeerWithRoot(F f) const
{
Mutex::Lock l(_peers_l);
RWMutex::RLock l(_peers_l);
std::vector<uintptr_t> rootPeerPtrs;
for(std::vector< SharedPtr<Peer> >::iterator i(_rootPeers.begin());i!=_rootPeers.end();++i)
for(std::vector< SharedPtr<Peer> >::const_iterator i(_rootPeers.begin());i!=_rootPeers.end();++i)
rootPeerPtrs.push_back((uintptr_t)i->ptr());
std::sort(rootPeerPtrs.begin(),rootPeerPtrs.end());
Hashtable< Address,SharedPtr<Peer> >::Iterator i(_peers);
Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers);
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p)) {
@ -208,7 +213,7 @@ public:
*/
ZT_ALWAYS_INLINE SharedPtr<Peer> findRelayTo(const int64_t now,const Address &toAddr)
{
Mutex::Lock l(_peers_l);
RWMutex::RLock l(_peers_l);
if (_rootPeers.empty())
return SharedPtr<Peer>();
return _rootPeers[0];
@ -217,17 +222,7 @@ public:
/**
* @param allPeers vector to fill with all current peers
*/
ZT_ALWAYS_INLINE void getAllPeers(std::vector< SharedPtr<Peer> > &allPeers) const
{
Mutex::Lock l(_peers_l);
allPeers.clear();
allPeers.reserve(_peers.size());
Hashtable< Address,SharedPtr<Peer> >::Iterator i(*(const_cast<Hashtable< Address,SharedPtr<Peer> > *>(&_peers)));
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p))
allPeers.push_back(*p);
}
void getAllPeers(std::vector< SharedPtr<Peer> > &allPeers) const;
/**
* Get info about a path
@ -249,21 +244,6 @@ public:
}
}
/**
* Get the payload MTU for an outbound physical path (returns default if not configured)
*
* @param physicalAddress Physical endpoint address
* @return MTU
*/
ZT_ALWAYS_INLINE unsigned int getOutboundPathMtu(const InetAddress &physicalAddress)
{
for(unsigned int i=0,j=_numConfiguredPhysicalPaths;i<j;++i) {
if (_physicalPathConfig[i].first.containsAddress(physicalAddress))
return _physicalPathConfig[i].second.mtu;
}
return ZT_DEFAULT_PHYSMTU;
}
/**
* Get the outbound trusted path ID for a physical address, or 0 if none
*
@ -297,57 +277,14 @@ public:
/**
* Set or clear physical path configuration (called via Node::setPhysicalPathConfiguration)
*/
inline void setPhysicalPathConfiguration(const struct sockaddr_storage *pathNetwork,const ZT_PhysicalPathConfiguration *pathConfig)
{
if (!pathNetwork) {
_numConfiguredPhysicalPaths = 0;
} else {
std::map<InetAddress,ZT_PhysicalPathConfiguration> cpaths;
for(unsigned int i=0,j=_numConfiguredPhysicalPaths;i<j;++i)
cpaths[_physicalPathConfig[i].first] = _physicalPathConfig[i].second;
if (pathConfig) {
ZT_PhysicalPathConfiguration pc(*pathConfig);
if (pc.mtu <= 0)
pc.mtu = ZT_DEFAULT_PHYSMTU;
else if (pc.mtu < ZT_MIN_PHYSMTU)
pc.mtu = ZT_MIN_PHYSMTU;
else if (pc.mtu > ZT_MAX_PHYSMTU)
pc.mtu = ZT_MAX_PHYSMTU;
cpaths[*(reinterpret_cast<const InetAddress *>(pathNetwork))] = pc;
} else {
cpaths.erase(*(reinterpret_cast<const InetAddress *>(pathNetwork)));
}
unsigned int cnt = 0;
for(std::map<InetAddress,ZT_PhysicalPathConfiguration>::const_iterator i(cpaths.begin());((i!=cpaths.end())&&(cnt<ZT_MAX_CONFIGURABLE_PATHS));++i) {
_physicalPathConfig[cnt].first = i->first;
_physicalPathConfig[cnt].second = i->second;
++cnt;
}
_numConfiguredPhysicalPaths = cnt;
}
}
void setPhysicalPathConfiguration(const struct sockaddr_storage *pathNetwork,const ZT_PhysicalPathConfiguration *pathConfig);
/**
* Add a root server's identity to the root server set
*
* @param id Root server identity
*/
inline void addRoot(const Identity &id)
{
if (id == _myIdentity) return; // sanity check
Mutex::Lock l1(_peers_l);
std::pair< std::set<Identity>::iterator,bool > ir(_roots.insert(id));
if (ir.second) {
SharedPtr<Peer> &p = _peers[id.address()];
if (!p)
p.set(new Peer(RR,_myIdentity,id));
_rootPeers.push_back(p);
}
}
void addRoot(const Identity &id);
/**
* Remove a root server's identity from the root server set
@ -355,83 +292,26 @@ public:
* @param id Root server identity
* @return True if root found and removed, false if not found
*/
inline bool removeRoot(const Identity &id)
{
Mutex::Lock l1(_peers_l);
std::set<Identity>::iterator r(_roots.find(id));
if (r != _roots.end()) {
for(std::vector< SharedPtr<Peer> >::iterator p(_rootPeers.begin());p!=_rootPeers.end();++p) {
if ((*p)->identity() == id) {
_rootPeers.erase(p);
break;
}
}
_roots.erase(r);
return true;
}
return false;
}
bool removeRoot(const Identity &id);
/**
* Sort roots in asecnding order of apparent latency
*
* @param now Current time
*/
ZT_ALWAYS_INLINE void rankRoots(const int64_t now)
{
Mutex::Lock l1(_peers_l);
std::sort(_rootPeers.begin(),_rootPeers.end(),_RootSortComparisonOperator(now));
}
void rankRoots(const int64_t now);
/**
* Do periodic tasks such as database cleanup
*/
ZT_ALWAYS_INLINE void doPeriodicTasks(const int64_t now)
{
{
Mutex::Lock l1(_peers_l);
Hashtable< Address,SharedPtr<Peer> >::Iterator i(_peers);
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p)) {
if ( (!(*p)->alive(now)) && (_roots.count((*p)->identity()) == 0) )
_peers.erase(*a);
}
}
{
Mutex::Lock l1(_paths_l);
Hashtable< Path::HashKey,SharedPtr<Path> >::Iterator i(_paths);
Path::HashKey *k = (Path::HashKey *)0;
SharedPtr<Path> *p = (SharedPtr<Path> *)0;
while (i.next(k,p)) {
if (p->references() <= 1)
_paths.erase(*k);
}
}
}
void doPeriodicTasks(const int64_t now);
private:
struct _RootSortComparisonOperator
{
ZT_ALWAYS_INLINE _RootSortComparisonOperator(const int64_t now) : _now(now) {}
ZT_ALWAYS_INLINE bool operator()(const SharedPtr<Peer> &a,const SharedPtr<Peer> &b)
{
const int64_t now = _now;
if (a->alive(now)) {
if (b->alive(now))
return (a->latency(now) < b->latency(now));
return true;
}
return false;
}
const int64_t _now;
};
const RuntimeEnvironment *const RR;
const Identity _myIdentity;
Mutex _peers_l;
Mutex _paths_l;
RWMutex _peers_l;
RWMutex _paths_l;
std::pair< InetAddress,ZT_PhysicalPathConfiguration > _physicalPathConfig[ZT_MAX_CONFIGURABLE_PATHS];
unsigned int _numConfiguredPhysicalPaths;