Wire in redirectPeer(), now about ready to test clustering!

This commit is contained in:
Adam Ierymenko 2015-10-20 17:36:10 -07:00
parent 6a7b47e5e1
commit 978b056a01
3 changed files with 15 additions and 13 deletions

View file

@ -549,7 +549,7 @@ void Cluster::removeMember(uint16_t memberId)
_memberIds = newMemberIds; _memberIds = newMemberIds;
} }
bool Cluster::redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPhysicalAddress,bool offload) bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload)
{ {
if (!peerPhysicalAddress) // sanity check if (!peerPhysicalAddress) // sanity check
return false; return false;
@ -585,7 +585,7 @@ bool Cluster::redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPh
} }
if (best.size() > 0) { if (best.size() > 0) {
TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peer->address().toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance); TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance);
/* if (peer->remoteVersionProtocol() >= 5) { /* if (peer->remoteVersionProtocol() >= 5) {
// If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic
@ -593,7 +593,7 @@ bool Cluster::redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPh
// Otherwise send VERB_RENDEZVOUS for ourselves, which will trick peers into trying other endpoints for us even if they're too old for PUSH_DIRECT_PATHS // Otherwise send VERB_RENDEZVOUS for ourselves, which will trick peers into trying other endpoints for us even if they're too old for PUSH_DIRECT_PATHS
for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) { for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) {
if ((a->ss_family == AF_INET)||(a->ss_family == AF_INET6)) { if ((a->ss_family == AF_INET)||(a->ss_family == AF_INET6)) {
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); Packet outp(peerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0); // no flags outp.append((uint8_t)0); // no flags
RR->identity.address().appendTo(outp); // HACK: rendezvous with ourselves! with really old peers this will only work if I'm a root server! RR->identity.address().appendTo(outp); // HACK: rendezvous with ourselves! with really old peers this will only work if I'm a root server!
outp.append((uint16_t)a->port()); outp.append((uint16_t)a->port());
@ -611,7 +611,7 @@ bool Cluster::redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPh
return true; return true;
} else { } else {
TRACE("peer %s is at [%d,%d,%d], distance to us is %f and this seems to be the best",peer->address().toString().c_str(),px,py,pz,currentDistance); TRACE("peer %s is at [%d,%d,%d], distance to us is %f and this seems to be the best",peerAddress.toString().c_str(),px,py,pz,currentDistance);
return false; return false;
} }
} else { } else {

View file

@ -247,12 +247,12 @@ public:
/** /**
* Redirect this peer to a better cluster member if needed * Redirect this peer to a better cluster member if needed
* *
* @param peer Peer to (possibly) redirect * @param peerAddress Peer to (possibly) redirect
* @param peerPhysicalAddress Physical address of peer's current best path (where packet was most recently received or getBestPath()->address()) * @param peerPhysicalAddress Physical address of peer's current best path (where packet was most recently received or getBestPath()->address())
* @param offload Always redirect if possible -- can be used to offload peers during shutdown * @param offload Always redirect if possible -- can be used to offload peers during shutdown
* @return True if peer was redirected * @return True if peer was redirected
*/ */
bool redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPhysicalAddress,bool offload); bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload);
private: private:
void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len); void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);

View file

@ -82,6 +82,7 @@ void Peer::received(
{ {
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
bool needMulticastGroupAnnounce = false; bool needMulticastGroupAnnounce = false;
bool pathIsConfirmed = false;
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
@ -89,8 +90,6 @@ void Peer::received(
_lastReceive = now; _lastReceive = now;
if (!hops) { if (!hops) {
bool pathIsConfirmed = false;
/* Learn new paths from direct (hops == 0) packets */ /* Learn new paths from direct (hops == 0) packets */
{ {
unsigned int np = _numPaths; unsigned int np = _numPaths;
@ -141,11 +140,6 @@ void Peer::received(
} }
} }
} }
#ifdef ZT_ENABLE_CLUSTER
if ((pathIsConfirmed)&&(RR->cluster))
RR->cluster->replicateHavePeer(_id);
#endif
} }
if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) { if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) {
@ -159,6 +153,14 @@ void Peer::received(
_lastMulticastFrame = now; _lastMulticastFrame = now;
} }
#ifdef ZT_ENABLE_CLUSTER
if ((pathIsConfirmed)&&(RR->cluster)) {
// Either shuttle this peer off somewhere else or report to other members that we have it
if (!RR->cluster->redirectPeer(_id.address(),remoteAddr,false))
RR->cluster->replicateHavePeer(_id);
}
#endif
if (needMulticastGroupAnnounce) { if (needMulticastGroupAnnounce) {
const std::vector< SharedPtr<Network> > networks(RR->node->allNetworks()); const std::vector< SharedPtr<Network> > networks(RR->node->allNetworks());
for(std::vector< SharedPtr<Network> >::const_iterator n(networks.begin());n!=networks.end();++n) for(std::vector< SharedPtr<Network> >::const_iterator n(networks.begin());n!=networks.end();++n)