Replicate peer endpoints and forget paths if we have them -- this allows two clusters to talk to each other, whereas forgetting all paths does not.

This commit is contained in:
Adam Ierymenko 2015-10-27 16:47:13 -07:00
parent cc6080fe38
commit cc1b275ad9
5 changed files with 58 additions and 28 deletions

View file

@ -210,22 +210,30 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
} break; } break;
case STATE_MESSAGE_HAVE_PEER: { case STATE_MESSAGE_HAVE_PEER: {
try { Identity id;
Identity id; InetAddress physicalAddress;
ptr += id.deserialize(dmsg,ptr); ptr += id.deserialize(dmsg,ptr);
if (id) { ptr += physicalAddress.deserialize(dmsg,ptr);
RR->topology->saveIdentity(id); if (id) {
{ // Forget any paths that we have to this peer at its address
Mutex::Lock _l2(_peerAffinities_m); if (physicalAddress) {
_PA &pa = _peerAffinities[id.address()]; SharedPtr<Peer> myPeerRecord(RR->topology->getPeer(id.address()));
pa.ts = RR->node->now(); if (myPeerRecord)
pa.mid = fromMemberId; myPeerRecord->removePathByAddress(physicalAddress);
} }
TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
} // Always save identity to update file time
} catch ( ... ) { RR->topology->saveIdentity(id);
// ignore invalid identities
} // Set peer affinity to its new home
{
Mutex::Lock _l2(_peerAffinities_m);
_PA &pa = _peerAffinities[id.address()];
pa.ts = RR->node->now();
pa.mid = fromMemberId;
}
TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str());
}
} break; } break;
case STATE_MESSAGE_MULTICAST_LIKE: { case STATE_MESSAGE_MULTICAST_LIKE: {
@ -396,7 +404,7 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
return true; return true;
} }
void Cluster::replicateHavePeer(const Identity &peerId) void Cluster::replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress)
{ {
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
{ // Use peer affinity table to track our own last announce time for peers { // Use peer affinity table to track our own last announce time for peers
@ -405,7 +413,7 @@ void Cluster::replicateHavePeer(const Identity &peerId)
if (pa.mid != _id) { if (pa.mid != _id) {
pa.ts = now; pa.ts = now;
pa.mid = _id; pa.mid = _id;
} else if ((now - pa.ts) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) { } else if ((now - pa.ts) < ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
return; return;
} else { } else {
pa.ts = now; pa.ts = now;
@ -415,6 +423,7 @@ void Cluster::replicateHavePeer(const Identity &peerId)
// announcement // announcement
Buffer<4096> buf; Buffer<4096> buf;
peerId.serialize(buf,false); peerId.serialize(buf,false);
physicalAddress.serialize(buf);
{ {
Mutex::Lock _l(_memberIds_m); Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {

View file

@ -117,6 +117,10 @@ public:
/** /**
* Cluster member has this peer: * Cluster member has this peer:
* <[...] binary serialized peer identity> * <[...] binary serialized peer identity>
* <[...] binary serialized peer remote physical address>
*
* Clusters send this message when they learn a path to a peer. The
* replicated physical address is the one learned.
*/ */
STATE_MESSAGE_HAVE_PEER = 2, STATE_MESSAGE_HAVE_PEER = 2,
@ -225,8 +229,9 @@ public:
* Advertise to the cluster that we have this peer * Advertise to the cluster that we have this peer
* *
* @param peerId Identity of peer that we have * @param peerId Identity of peer that we have
* @param physicalAddress Physical address of peer (from our POV)
*/ */
void replicateHavePeer(const Identity &peerId); void replicateHavePeer(const Identity &peerId,const InetAddress &physicalAddress);
/** /**
* Advertise a multicast LIKE to the cluster * Advertise a multicast LIKE to the cluster

View file

@ -317,7 +317,7 @@
/** /**
* Minimum delay between attempts to confirm new paths to peers (to avoid HELLO flooding) * Minimum delay between attempts to confirm new paths to peers (to avoid HELLO flooding)
*/ */
#define ZT_MIN_PATH_CONFIRMATION_INTERVAL 5000 #define ZT_MIN_PATH_CONFIRMATION_INTERVAL 1000
/** /**
* Interval between direct path pushes in milliseconds * Interval between direct path pushes in milliseconds
@ -350,7 +350,7 @@
/** /**
* Maximum number of endpoints to contact per address type (to limit pushes like GitHub issue #235) * Maximum number of endpoints to contact per address type (to limit pushes like GitHub issue #235)
*/ */
#define ZT_PUSH_DIRECT_PATHS_MAX_ENDPOINTS_PER_TYPE 2 #define ZT_PUSH_DIRECT_PATHS_MAX_ENDPOINTS_PER_TYPE 4
/** /**
* A test pseudo-network-ID that can be joined * A test pseudo-network-ID that can be joined

View file

@ -140,13 +140,10 @@ void Peer::received(
_lastMulticastFrame = now; _lastMulticastFrame = now;
#ifdef ZT_ENABLE_CLUSTER #ifdef ZT_ENABLE_CLUSTER
// If we're in cluster mode and there's a better endpoint, stop here and don't // If we think this peer belongs elsewhere, don't learn this path or
// learn or confirm paths. Also reset any existing paths, since they should // do other connection init stuff.
// go there and no longer talk to us here. if (redirectTo)
if (redirectTo) {
_numPaths = 0;
return; return;
}
#endif #endif
if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) {
@ -206,7 +203,7 @@ void Peer::received(
#ifdef ZT_ENABLE_CLUSTER #ifdef ZT_ENABLE_CLUSTER
if ((RR->cluster)&&(pathIsConfirmed)) if ((RR->cluster)&&(pathIsConfirmed))
RR->cluster->replicateHavePeer(_id); RR->cluster->replicateHavePeer(_id,remoteAddr);
#endif #endif
if (needMulticastGroupAnnounce) { if (needMulticastGroupAnnounce) {

View file

@ -412,6 +412,25 @@ public:
*/ */
void clean(const RuntimeEnvironment *RR,uint64_t now); void clean(const RuntimeEnvironment *RR,uint64_t now);
/**
* Remove all paths with this remote address
*
* @param addr Remote address to remove
*/
inline void removePathByAddress(const InetAddress &addr)
{
Mutex::Lock _l(_lock);
unsigned int np = _numPaths;
unsigned int x = 0;
unsigned int y = 0;
while (x < np) {
if (_paths[x].address() != addr)
_paths[y++] = _paths[x];
++x;
}
_numPaths = y;
}
/** /**
* Find a common set of addresses by which two peers can link, if any * Find a common set of addresses by which two peers can link, if any
* *