From b27a38e55e55f0c281295261e1a21806d67dd5e1 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 29 Aug 2019 14:27:41 -0700 Subject: [PATCH] Implement WHOIS --- root/root.cpp | 263 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 195 insertions(+), 68 deletions(-) diff --git a/root/root.cpp b/root/root.cpp index d30060f69..2e3ca5035 100644 --- a/root/root.cpp +++ b/root/root.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -60,6 +61,12 @@ using namespace ZeroTier; using json = nlohmann::json; +#ifdef MSG_DONTWAIT +#define SENDTO_FLAGS MSG_DONTWAIT +#else +#define SENDTO_FLAGS 0 +#endif + ////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// @@ -88,20 +95,25 @@ struct RendezvousKey struct RootPeer { + ZT_ALWAYS_INLINE RootPeer() : lastReceive(0),lastSync(0),lastEcho(0),lastHello(0) {} + ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); } + Identity id; uint8_t key[32]; InetAddress ip4,ip6; int64_t lastReceive; int64_t lastSync; + int64_t lastEcho; + int64_t lastHello; + std::mutex lock; AtomicCounter __refCount; - - ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); } }; static Identity self; static std::atomic_bool run; static json config; +static std::string statsRoot; static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > multicastSubscriptions; static std::unordered_map< Identity,SharedPtr,IdentityHasher > peersByIdentity; @@ -126,8 +138,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip const Address dest(pkt.destination()); const int64_t now = OSUtils::now(); - // See if this is destined for us and isn't a fragment / fragmented. (No packets - // understood by the root are fragments/fragmented.) if ((!fragment)&&(!pkt.fragmented())&&(dest == self.address())) { SharedPtr peer; @@ -161,7 +171,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip } peer->id = id; peer->lastReceive = now; - peer->lastSync = 0; peersByIdentity.emplace(id,peer); peersByVirtAddr[id.address()].emplace(peer); } else { @@ -199,6 +208,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip // If we found the peer, update IP and/or time and handle certain key packet types that the // root must concern itself with. if (peer) { + std::lock_guard pl(peer->lock); + InetAddress *const peerIp = ip->isV4() ? &(peer->ip4) : &(peer->ip6); if (*peerIp != ip) { std::lock_guard pbp_l(peersByPhysAddr_l); @@ -220,25 +231,71 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip switch(pkt.verb()) { case Packet::VERB_HELLO: try { - const uint64_t origId = pkt.packetId(); - const uint64_t ts = pkt.template at(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP); - pkt.reset(source,self.address(),Packet::VERB_OK); - pkt.append((uint8_t)Packet::VERB_HELLO); - pkt.append(origId); - pkt.append(ts); - pkt.append((uint8_t)ZT_PROTO_VERSION); - pkt.append((uint8_t)0); - pkt.append((uint8_t)0); - pkt.append((uint16_t)0); - ip->serialize(pkt); - pkt.armor(peer->key,true); - sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); - //printf("%s <- OK(HELLO)" ZT_EOL_S,ip->toString(ipstr)); + if ((now - peer->lastHello) > 1000) { + peer->lastHello = now; + const uint64_t origId = pkt.packetId(); + const uint64_t ts = pkt.template at(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP); + pkt.reset(source,self.address(),Packet::VERB_OK); + pkt.append((uint8_t)Packet::VERB_HELLO); + pkt.append(origId); + pkt.append(ts); + pkt.append((uint8_t)ZT_PROTO_VERSION); + pkt.append((uint8_t)0); + pkt.append((uint8_t)0); + pkt.append((uint16_t)0); + ip->serialize(pkt); + pkt.armor(peer->key,true); + sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + } } catch ( ... ) { printf("* unexpected exception handling HELLO from %s" ZT_EOL_S,ip->toString(ipstr)); } break; + case Packet::VERB_ECHO: + try { + if ((now - peer->lastEcho) > 1000) { + peer->lastEcho = now; + Packet outp(source,self.address(),Packet::VERB_OK); + outp.append((uint8_t)Packet::VERB_ECHO); + outp.append(pkt.packetId()); + outp.append(((const uint8_t *)pkt.data()) + ZT_PACKET_IDX_PAYLOAD,pkt.size() - ZT_PACKET_IDX_PAYLOAD); + outp.compress(); + outp.armor(peer->key,true); + sendto(ip->isV4() ? v4s : v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + } + } catch ( ... ) { + printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr)); + } + + case Packet::VERB_WHOIS: + try { + std::vector< SharedPtr > results; + { + std::lock_guard l(peersByVirtAddr_l); + for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+ZT_ADDRESS_LENGTH)<=pkt.size();ptr+=ZT_ADDRESS_LENGTH) { + auto peers = peersByVirtAddr.find(Address(pkt.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH)); + if (peers != peersByVirtAddr.end()) { + for(auto p=peers->second.begin();p!=peers->second.end();++p) + results.push_back(*p); + } + } + } + + if (!results.empty()) { + const uint64_t origId = pkt.packetId(); + pkt.reset(source,self.address(),Packet::VERB_OK); + pkt.append((uint8_t)Packet::VERB_WHOIS); + pkt.append(origId); + for(auto p=results.begin();p!=results.end();++p) + (*p)->id.serialize(pkt,false); + pkt.armor(peer->key,true); + sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + } + } catch ( ... ) { + printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr)); + } + case Packet::VERB_MULTICAST_LIKE: try { std::lock_guard l(multicastSubscriptions_l); @@ -289,7 +346,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip if (l > 0) { pkt.setAt(countAt,(uint16_t)l); pkt.armor(peer->key,true); - sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); //printf("%s %s gathered %u subscribers to %s/%.8lx on network %.16llx" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),l,mg.mac().toString(tmpstr),(unsigned long)mg.adi(),(unsigned long long)nwid); } } @@ -312,7 +369,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip // sending a RENDEZVOUS message. bool introduce = false; - { + if (!fragment) { RendezvousKey rk(source,dest); std::lock_guard l(lastRendezvous_l); int64_t &lr = lastRendezvous[rk]; @@ -328,10 +385,10 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip auto peers = peersByVirtAddr.find(dest); if (peers != peersByVirtAddr.end()) { for(auto p=peers->second.begin();p!=peers->second.end();++p) { - if ((*p)->ip6) { - toAddrs.push_back(std::pair< InetAddress *,SharedPtr >(&((*p)->ip6),*p)); - } else if ((*p)->ip4) { + if ((*p)->ip4) { toAddrs.push_back(std::pair< InetAddress *,SharedPtr >(&((*p)->ip4),*p)); + } else if ((*p)->ip6) { + toAddrs.push_back(std::pair< InetAddress *,SharedPtr >(&((*p)->ip6),*p)); } } } @@ -358,7 +415,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((uint8_t)16); outp.append((const uint8_t *)b->second->ip6.rawIpData(),16); outp.armor((*a)->key,true); - sendto(v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&((*a)->ip6),(socklen_t)sizeof(struct sockaddr_in6)); + sendto(v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip6),(socklen_t)sizeof(struct sockaddr_in6)); // Introduce destination to source (V6) outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS); @@ -368,8 +425,9 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((uint8_t)16); outp.append((const uint8_t *)ip->rawIpData(),16); outp.armor(b->second->key,true); - sendto(v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6)); - } else if (((*a)->ip4)&&(b->second->ip4)) { + sendto(v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6)); + } + if (((*a)->ip4)&&(b->second->ip4)) { //printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip4.toString(ipstr2),dest.toString(astr2)); // Introduce source to destination (V4) @@ -380,7 +438,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((uint8_t)4); outp.append((const uint8_t *)b->second->ip4.rawIpData(),4); outp.armor((*a)->key,true); - sendto(v4s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&((*a)->ip4),(socklen_t)sizeof(struct sockaddr_in)); + sendto(v4s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip4),(socklen_t)sizeof(struct sockaddr_in)); // Introduce destination to source (V4) outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS); @@ -390,7 +448,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((uint8_t)4); outp.append((const uint8_t *)ip->rawIpData(),4); outp.armor(b->second->key,true); - sendto(v4s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in)); + sendto(v4s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in)); } } } @@ -410,8 +468,10 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip } for(auto i=toAddrs.begin();i!=toAddrs.end();++i) { - //printf("%s -> %s for %s -> %s" ZT_EOL_S,ip->toString(ipstr),i->first->toString(ipstr2),source.toString(astr),dest.toString(astr2)); - sendto(i->first->isV4() ? v4s : v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + //printf("%s -> %s for %s -> %s (%u bytes)" ZT_EOL_S,ip->toString(ipstr),i->first->toString(ipstr2),source.toString(astr),dest.toString(astr2),pkt.size()); + if (sendto(i->first->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))) <= 0) { + printf("* write error forwarding packet to %s: %s" ZT_EOL_S,i->first->toString(ipstr),strerror(errno)); + } } } @@ -448,8 +508,11 @@ static int bindSocket(struct sockaddr *bindAddr) f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f)); #endif } - f = 1; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); +#ifdef SO_REUSEPORT f = 1; setsockopt(s,SOL_SOCKET,SO_REUSEPORT,(void *)&f,sizeof(f)); +#else + f = 1; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); +#endif f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f)); #ifdef IP_DONTFRAG f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f)); @@ -534,6 +597,15 @@ int main(int argc,char **argv) } catch ( ... ) { port = ZT_DEFAULT_PORT; } + try { + statsRoot = config["statsRoot"]; + while ((statsRoot.length() > 0)&&(statsRoot[statsRoot.length()-1] == ZT_PATH_SEPARATOR)) + statsRoot = statsRoot.substr(0,statsRoot.length()-1); + if (statsRoot.length() > 0) + OSUtils::mkdir(statsRoot); + } catch ( ... ) { + statsRoot = ""; + } unsigned int ncores = std::thread::hardware_concurrency(); if (ncores == 0) ncores = 1; @@ -567,7 +639,7 @@ int main(int argc,char **argv) sockets.push_back(s6); sockets.push_back(s4); - threads.push_back(std::thread([s4,s6]() { + threads.push_back(std::thread([s6,s4]() { struct sockaddr_in6 in6; Packet pkt; memset(&in6,0,sizeof(in6)); @@ -590,7 +662,7 @@ int main(int argc,char **argv) } })); - threads.push_back(std::thread([s4,s6]() { + threads.push_back(std::thread([s6,s4]() { struct sockaddr_in in4; Packet pkt; memset(&in4,0,sizeof(in4)); @@ -616,12 +688,13 @@ int main(int argc,char **argv) int64_t lastCleanedMulticastSubscriptions = 0; int64_t lastCleanedPeers = 0; + int64_t lastWroteStats = 0; while (run) { - peersByIdentity_l.lock(); - peersByPhysAddr_l.lock(); - printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)peersByIdentity.size(),(unsigned long)peersByPhysAddr.size()); - peersByPhysAddr_l.unlock(); - peersByIdentity_l.unlock(); + //peersByIdentity_l.lock(); + //peersByPhysAddr_l.lock(); + //printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)peersByIdentity.size(),(unsigned long)peersByPhysAddr.size()); + //peersByPhysAddr_l.unlock(); + //peersByIdentity_l.unlock(); sleep(1); const int64_t now = OSUtils::now(); @@ -650,38 +723,92 @@ int main(int argc,char **argv) if ((now - lastCleanedPeers) > 120000) { lastCleanedPeers = now; - std::lock_guard pbi_l(peersByIdentity_l); - for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();) { - if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) { - std::lock_guard pbv_l(peersByVirtAddr_l); - std::lock_guard pbp_l(peersByPhysAddr_l); + { + std::lock_guard pbi_l(peersByIdentity_l); + for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();) { + if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) { + std::lock_guard pbv_l(peersByVirtAddr_l); + std::lock_guard pbp_l(peersByPhysAddr_l); - auto pbv = peersByVirtAddr.find(p->second->id.address()); - if (pbv != peersByVirtAddr.end()) { - pbv->second.erase(p->second); - if (pbv->second.empty()) - peersByVirtAddr.erase(pbv); - } - - if (p->second->ip4) { - auto pbp = peersByPhysAddr.find(p->second->ip4); - if (pbp != peersByPhysAddr.end()) { - pbp->second.erase(p->second); - if (pbp->second.empty()) - peersByPhysAddr.erase(pbp); + auto pbv = peersByVirtAddr.find(p->second->id.address()); + if (pbv != peersByVirtAddr.end()) { + pbv->second.erase(p->second); + if (pbv->second.empty()) + peersByVirtAddr.erase(pbv); } - } - if (p->second->ip6) { - auto pbp = peersByPhysAddr.find(p->second->ip6); - if (pbp != peersByPhysAddr.end()) { - pbp->second.erase(p->second); - if (pbp->second.empty()) - peersByPhysAddr.erase(pbp); - } - } - peersByIdentity.erase(p++); - } else ++p; + if (p->second->ip4) { + auto pbp = peersByPhysAddr.find(p->second->ip4); + if (pbp != peersByPhysAddr.end()) { + pbp->second.erase(p->second); + if (pbp->second.empty()) + peersByPhysAddr.erase(pbp); + } + } + if (p->second->ip6) { + auto pbp = peersByPhysAddr.find(p->second->ip6); + if (pbp != peersByPhysAddr.end()) { + pbp->second.erase(p->second); + if (pbp->second.empty()) + peersByPhysAddr.erase(pbp); + } + } + + peersByIdentity.erase(p++); + } else ++p; + } + } + + { + std::lock_guard l(lastRendezvous_l); + for(auto lr=lastRendezvous.begin();lr!=lastRendezvous.end();) { + if ((now - lr->second) > ZT_PEER_ACTIVITY_TIMEOUT) + lastRendezvous.erase(lr++); + else ++lr; + } + } + } + + if (((now - lastWroteStats) > 15000)&&(statsRoot.length() > 0)) { + lastWroteStats = now; + + std::string peersFilePath(statsRoot); + peersFilePath.append("/peers.tmp"); + FILE *pf = fopen(peersFilePath.c_str(),"wb"); + + if (pf) { + std::vector< SharedPtr > sp; + { + std::lock_guard pbi_l(peersByIdentity_l); + sp.reserve(peersByIdentity.size()); + for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();++p) { + sp.push_back(p->second); + } + } + std::sort(sp.begin(),sp.end(),[](const SharedPtr &a,const SharedPtr &b) { return (a->id < b->id); }); + + char ip4[128],ip6[128]; + for(auto p=sp.begin();p!=sp.end();++p) { + if ((*p)->ip4) { + (*p)->ip4.toString(ip4); + } else { + ip4[0] = '-'; + ip4[1] = 0; + } + if ((*p)->ip6) { + (*p)->ip6.toString(ip6); + } else { + ip6[0] = '-'; + ip6[1] = 0; + } + fprintf(pf,"%.10llx %21s %45s %5.4f" ZT_EOL_S,(unsigned long long)(*p)->id.address().toInt(),ip4,ip6,fabs((double)(now - (*p)->lastReceive) / 1000.0)); + } + fclose(pf); + + std::string peersFilePath2(statsRoot); + peersFilePath2.append("/peers"); + OSUtils::rm(peersFilePath2); + OSUtils::rename(peersFilePath.c_str(),peersFilePath2.c_str()); } } }