Further pare down Cluster messaging and rename some stuff.

This commit is contained in:
Adam Ierymenko 2015-11-06 14:37:17 -08:00
parent 9490b1f136
commit 5f39d5b7ea
7 changed files with 24 additions and 40 deletions

View file

@ -214,19 +214,10 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
case STATE_MESSAGE_HAVE_PEER: { case STATE_MESSAGE_HAVE_PEER: {
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
InetAddress physicalAddress; Mutex::Lock _l2(_peerAffinities_m);
ptr += physicalAddress.deserialize(dmsg,ptr); _PA &pa = _peerAffinities[zeroTierAddress];
if (physicalAddress) { pa.ts = RR->node->now();
SharedPtr<Peer> myPeerRecord(RR->topology->getPeerNoCache(zeroTierAddress)); pa.mid = fromMemberId;
if (myPeerRecord)
myPeerRecord->removePathByAddress(physicalAddress);
}
{
Mutex::Lock _l2(_peerAffinities_m);
_PA &pa = _peerAffinities[zeroTierAddress];
pa.ts = RR->node->now();
pa.mid = fromMemberId;
}
TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str());
} break; } break;
@ -402,7 +393,7 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
return true; return true;
} }
void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress) void Cluster::replicateHavePeer(const Identity &peerId)
{ {
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
{ {
@ -420,14 +411,13 @@ void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physic
} }
} }
Buffer<1024> buf; char buf[ZT_ADDRESS_LENGTH];
peerId.address().appendTo(buf); peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH);
physicalAddress.serialize(buf);
{ {
Mutex::Lock _l(_memberIds_m); Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock); Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,STATE_MESSAGE_HAVE_PEER,buf.data(),buf.size()); _send(*mid,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH);
} }
} }
} }
@ -472,9 +462,8 @@ struct _ClusterAnnouncePeers
Cluster *const parent; Cluster *const parent;
inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) const inline void operator()(const Topology &t,const SharedPtr<Peer> &peer) const
{ {
Path *p = peer->getBestPath(now); if (peer->hasActiveDirectPath(now))
if (p) parent->replicateHavePeer(peer->identity());
parent->replicateHavePeer(peer->identity(),p->address());
} }
}; };
void Cluster::doPeriodicTasks() void Cluster::doPeriodicTasks()

View file

@ -125,10 +125,6 @@ public:
/** /**
* Cluster member has this peer: * Cluster member has this peer:
* <[5] ZeroTier address of peer> * <[5] ZeroTier address of peer>
* <[...] binary serialized peer remote physical address>
*
* Clusters send this message when they learn a path to a peer. The
* replicated physical address is the one learned.
*/ */
STATE_MESSAGE_HAVE_PEER = 2, STATE_MESSAGE_HAVE_PEER = 2,
@ -237,9 +233,8 @@ public:
* Advertise to the cluster that we have this peer * Advertise to the cluster that we have this peer
* *
* @param peerId Identity of peer that we have * @param peerId Identity of peer that we have
* @param physicalAddress Physical address of peer (from our POV)
*/ */
void replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress); void replicateHavePeer(const Identity &peerId);
/** /**
* Advertise a multicast LIKE to the cluster * Advertise a multicast LIKE to the cluster

View file

@ -972,7 +972,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha
if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) ) { if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) ) {
if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str()); TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str());
peer->attemptToContactAt(RR,_localAddress,a,now); peer->sendHELLO(RR,_localAddress,a,now);
} else { } else {
TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str()); TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str());
} }
@ -983,7 +983,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,const Sha
if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) ) { if ( ((flags & 0x01) == 0) && (Path::isAddressValidForPath(a)) ) {
if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) { if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str()); TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str());
peer->attemptToContactAt(RR,_localAddress,a,now); peer->sendHELLO(RR,_localAddress,a,now);
} else { } else {
TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str()); TRACE("ignoring contact for %s at %s -- too many per scope",peer->address().toString().c_str(),a.toString().c_str());
} }

View file

@ -249,7 +249,7 @@ public:
} else { } else {
if (stableEndpoint4) { if (stableEndpoint4) {
needToContactIndirect = false; needToContactIndirect = false;
p->attemptToContactAt(RR,InetAddress(),stableEndpoint4,_now); p->sendHELLO(RR,InetAddress(),stableEndpoint4,_now);
} }
} }
if (p->doPingAndKeepalive(RR,_now,AF_INET6)) { if (p->doPingAndKeepalive(RR,_now,AF_INET6)) {
@ -257,7 +257,7 @@ public:
} else { } else {
if (stableEndpoint6) { if (stableEndpoint6) {
needToContactIndirect = false; needToContactIndirect = false;
p->attemptToContactAt(RR,InetAddress(),stableEndpoint6,_now); p->sendHELLO(RR,InetAddress(),stableEndpoint6,_now);
} }
} }

View file

@ -188,7 +188,7 @@ void Peer::received(
if ((now - _lastPathConfirmationSent) >= ZT_MIN_PATH_CONFIRMATION_INTERVAL) { if ((now - _lastPathConfirmationSent) >= ZT_MIN_PATH_CONFIRMATION_INTERVAL) {
_lastPathConfirmationSent = now; _lastPathConfirmationSent = now;
TRACE("got %s via unknown path %s(%s), confirming...",Packet::verbString(verb),_id.address().toString().c_str(),remoteAddr.toString().c_str()); TRACE("got %s via unknown path %s(%s), confirming...",Packet::verbString(verb),_id.address().toString().c_str(),remoteAddr.toString().c_str());
attemptToContactAt(RR,localAddr,remoteAddr,now); sendHELLO(RR,localAddr,remoteAddr,now);
} }
} }
@ -198,7 +198,7 @@ void Peer::received(
#ifdef ZT_ENABLE_CLUSTER #ifdef ZT_ENABLE_CLUSTER
if ((RR->cluster)&&(pathIsConfirmed)) if ((RR->cluster)&&(pathIsConfirmed))
RR->cluster->replicateHavePeer(_id,remoteAddr); RR->cluster->replicateHavePeer(_id);
#endif #endif
if (needMulticastGroupAnnounce) { if (needMulticastGroupAnnounce) {
@ -208,7 +208,7 @@ void Peer::received(
} }
} }
void Peer::attemptToContactAt(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now) void Peer::sendHELLO(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now)
{ {
// _lock not required here since _id is immutable and nothing else is accessed // _lock not required here since _id is immutable and nothing else is accessed
@ -242,7 +242,7 @@ bool Peer::doPingAndKeepalive(const RuntimeEnvironment *RR,uint64_t now,int inet
if (p) { if (p) {
if ((now - p->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) { if ((now - p->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) {
//TRACE("PING %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived()); //TRACE("PING %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived());
attemptToContactAt(RR,p->localAddress(),p->address(),now); sendHELLO(RR,p->localAddress(),p->address(),now);
p->sent(now); p->sent(now);
} else if (((now - p->lastSend()) >= ZT_NAT_KEEPALIVE_DELAY)&&(!p->reliable())) { } else if (((now - p->lastSend()) >= ZT_NAT_KEEPALIVE_DELAY)&&(!p->reliable())) {
//TRACE("NAT keepalive %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived()); //TRACE("NAT keepalive %s(%s) after %llums/%llums send/receive inactivity",_id.address().toString().c_str(),p->address().toString().c_str(),now - p->lastSend(),now - p->lastReceived());
@ -348,7 +348,7 @@ bool Peer::resetWithinScope(const RuntimeEnvironment *RR,InetAddress::IpScope sc
unsigned int y = 0; unsigned int y = 0;
while (x < np) { while (x < np) {
if (_paths[x].address().ipScope() == scope) { if (_paths[x].address().ipScope() == scope) {
attemptToContactAt(RR,_paths[x].localAddress(),_paths[x].address(),now); sendHELLO(RR,_paths[x].localAddress(),_paths[x].address(),now);
} else { } else {
_paths[y++] = _paths[x]; _paths[y++] = _paths[x];
} }

View file

@ -171,7 +171,7 @@ public:
* @param atAddress Destination address * @param atAddress Destination address
* @param now Current time * @param now Current time
*/ */
void attemptToContactAt(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now); void sendHELLO(const RuntimeEnvironment *RR,const InetAddress &localAddr,const InetAddress &atAddress,uint64_t now);
/** /**
* Send pings or keepalives depending on configured timeouts * Send pings or keepalives depending on configured timeouts

View file

@ -435,7 +435,7 @@ void Switch::rendezvous(const SharedPtr<Peer> &peer,const InetAddress &localAddr
{ {
TRACE("sending NAT-t message to %s(%s)",peer->address().toString().c_str(),atAddr.toString().c_str()); TRACE("sending NAT-t message to %s(%s)",peer->address().toString().c_str(),atAddr.toString().c_str());
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
peer->attemptToContactAt(RR,localAddr,atAddr,now); peer->sendHELLO(RR,localAddr,atAddr,now);
{ {
Mutex::Lock _l(_contactQueue_m); Mutex::Lock _l(_contactQueue_m);
_contactQueue.push_back(ContactQueueEntry(peer,now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY,localAddr,atAddr)); _contactQueue.push_back(ContactQueueEntry(peer,now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY,localAddr,atAddr));
@ -508,14 +508,14 @@ unsigned long Switch::doTimerTasks(uint64_t now)
} else { } else {
if (qi->strategyIteration == 0) { if (qi->strategyIteration == 0) {
// First strategy: send packet directly to destination // First strategy: send packet directly to destination
qi->peer->attemptToContactAt(RR,qi->localAddr,qi->inaddr,now); qi->peer->sendHELLO(RR,qi->localAddr,qi->inaddr,now);
} else if (qi->strategyIteration <= 4) { } else if (qi->strategyIteration <= 4) {
// Strategies 1-4: try escalating ports for symmetric NATs that remap sequentially // Strategies 1-4: try escalating ports for symmetric NATs that remap sequentially
InetAddress tmpaddr(qi->inaddr); InetAddress tmpaddr(qi->inaddr);
int p = (int)qi->inaddr.port() + qi->strategyIteration; int p = (int)qi->inaddr.port() + qi->strategyIteration;
if (p < 0xffff) { if (p < 0xffff) {
tmpaddr.setPort((unsigned int)p); tmpaddr.setPort((unsigned int)p);
qi->peer->attemptToContactAt(RR,qi->localAddr,tmpaddr,now); qi->peer->sendHELLO(RR,qi->localAddr,tmpaddr,now);
} else qi->strategyIteration = 5; } else qi->strategyIteration = 5;
} else { } else {
// All strategies tried, expire entry // All strategies tried, expire entry