From fd6e8d8c5ced937e5ac38154f450e848a8c6efda Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 3 Sep 2019 12:21:57 -0700 Subject: [PATCH] Add instrumentation to root, add capability to forward to siblings if no path is known to a peer. --- node/Meter.hpp | 89 ++++++++++ node/Mutex.hpp | 4 +- root/root.cpp | 436 +++++++++++++++++++++++++++++++++++++------------ selftest.cpp | 18 ++ 4 files changed, 437 insertions(+), 110 deletions(-) create mode 100644 node/Meter.hpp diff --git a/node/Meter.hpp b/node/Meter.hpp new file mode 100644 index 000000000..58def766a --- /dev/null +++ b/node/Meter.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (c)2019 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2023-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#ifndef ZT_METER_HPP +#define ZT_METER_HPP + +#include "Constants.hpp" +#include "Mutex.hpp" + +#define ZT_METER_HISTORY_LENGTH 4 +#define ZT_METER_HISTORY_TICK_DURATION 1000 + +namespace ZeroTier { + +/** + * Transfer rate meter (thread-safe) + */ +class Meter +{ +public: + ZT_ALWAYS_INLINE Meter() + { + for(int i=0;i + ZT_ALWAYS_INLINE void log(const int64_t now,I count) + { + _lock.lock(); + const int64_t since = now - _ts; + if (since >= 1000) { + _ts = now; + for(int i=1;i #include @@ -47,6 +82,7 @@ #include #include #include +#include #include #include @@ -73,11 +109,13 @@ using json = nlohmann::json; ////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// +// Hashers for std::unordered_map struct IdentityHasher { ZT_ALWAYS_INLINE std::size_t operator()(const Identity &id) const { return (std::size_t)id.hashCode(); } }; struct AddressHasher { ZT_ALWAYS_INLINE std::size_t operator()(const Address &a) const { return (std::size_t)a.toInt(); } }; struct InetAddressHasher { ZT_ALWAYS_INLINE std::size_t operator()(const InetAddress &ip) const { return (std::size_t)ip.hashCode(); } }; struct MulticastGroupHasher { ZT_ALWAYS_INLINE std::size_t operator()(const MulticastGroup &mg) const { return (std::size_t)mg.hashCode(); } }; +// An ordered tuple key representing an introduction of one peer to another struct RendezvousKey { RendezvousKey(const Address &aa,const Address &bb) @@ -96,36 +134,52 @@ struct RendezvousKey struct Hasher { ZT_ALWAYS_INLINE std::size_t operator()(const RendezvousKey &k) const { return (std::size_t)(k.a.toInt() ^ k.b.toInt()); } }; }; +/** + * RootPeer is a normal peer known to this root + * + * This can also be a sibling root, which is itself a peer. Sibling roots + * are sent HELLO while for other peers we only listen for HELLO. + */ struct RootPeer { ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastSync(0),lastEcho(0),lastHello(0),vMajor(-1),vMinor(-1),vRev(-1) {} ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); } - Identity id; - uint8_t key[32]; - InetAddress ip4,ip6; - int64_t lastSend; - int64_t lastReceive; - int64_t lastSync; - int64_t lastEcho; - int64_t lastHello; - int vMajor,vMinor,vRev; + Identity id; // Identity + uint8_t key[32]; // Shared secret key + InetAddress ip4,ip6; // IPv4 and IPv6 addresses + int64_t lastSend; // Time of last send (any packet) + int64_t lastReceive; // Time of last receive (any packet) + int64_t lastSync; // Time of last data synchronization with LF or other root state backend (currently unused) + int64_t lastEcho; // Time of last received ECHO + int64_t lastHello; // Time of last received HELLO + int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown + bool sibling; // If true, this is a sibling root that will get forwards we don't know where to send std::mutex lock; AtomicCounter __refCount; }; +static int64_t startTime; +static std::vector ports; static Identity self; static std::atomic_bool run; static json config; static std::string statsRoot; +static Meter inputRate; +static Meter outputRate; +static Meter forwardRate; +static Meter siblingForwardRate; + +static std::vector< SharedPtr > siblings; static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > multicastSubscriptions; static std::unordered_map< Identity,SharedPtr,IdentityHasher > peersByIdentity; static std::unordered_map< Address,std::set< SharedPtr >,AddressHasher > peersByVirtAddr; static std::unordered_map< InetAddress,std::set< SharedPtr >,InetAddressHasher > peersByPhysAddr; static std::unordered_map< RendezvousKey,int64_t,RendezvousKey::Hasher > lastRendezvous; +static std::mutex siblings_l; static std::mutex multicastSubscriptions_l; static std::mutex peersByIdentity_l; static std::mutex peersByVirtAddr_l; @@ -143,6 +197,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip const Address dest(pkt.destination()); const int64_t now = OSUtils::now(); + inputRate.log(now,pkt.size()); + if ((!fragment)&&(!pkt.fragmented())&&(dest == self.address())) { SharedPtr peer; @@ -256,6 +312,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip pkt.armor(peer->key,true); sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + outputRate.log(now,pkt.size()); peer->lastSend = now; } } catch ( ... ) { @@ -267,6 +324,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip try { if ((now - peer->lastEcho) > 1000) { peer->lastEcho = now; + Packet outp(source,self.address(),Packet::VERB_OK); outp.append((uint8_t)Packet::VERB_ECHO); outp.append(pkt.packetId()); @@ -274,6 +332,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.compress(); outp.armor(peer->key,true); sendto(ip->isV4() ? v4s : v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + + outputRate.log(now,outp.size()); peer->lastSend = now; } } catch ( ... ) { @@ -303,6 +363,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip (*p)->id.serialize(pkt,false); pkt.armor(peer->key,true); sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + + outputRate.log(now,pkt.size()); peer->lastSend = now; } } catch ( ... ) { @@ -326,7 +388,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip case Packet::VERB_MULTICAST_GATHER: try { const uint64_t nwid = pkt.template at(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID); - const unsigned int flags = pkt[ZT_PROTO_VERB_MULTICAST_GATHER_IDX_FLAGS]; + //const unsigned int flags = pkt[ZT_PROTO_VERB_MULTICAST_GATHER_IDX_FLAGS]; const MulticastGroup mg(MAC(pkt.field(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC,6),6),pkt.template at(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI)); unsigned int gatherLimit = pkt.template at(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT); if (gatherLimit > 255) @@ -362,6 +424,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip pkt.setAt(countAt,(uint16_t)l); pkt.armor(peer->key,true); sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); + + outputRate.log(now,pkt.size()); peer->lastSend = now; //printf("%s %s gathered %u subscribers to %s/%.8lx on network %.16llx" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),l,mg.mac().toString(tmpstr),(unsigned long)mg.adi(),(unsigned long long)nwid); } @@ -410,9 +474,19 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip } } if (toAddrs.empty()) { - //printf("%s not forwarding to %s: no destinations found" ZT_EOL_S,ip->toString(ipstr),dest().toString(astr)); - return; + std::lock_guard sib_l(siblings_l); + for(auto s=siblings.begin();s!=siblings.end();++s) { + if (((now - (*s)->lastReceive) < (ZT_PEER_PING_PERIOD * 2))&&((*s)->sibling)) { + if ((*s)->ip4) { + toAddrs.push_back(std::pair< InetAddress *,SharedPtr >(&((*s)->ip4),*s)); + } else if ((*s)->ip6) { + toAddrs.push_back(std::pair< InetAddress *,SharedPtr >(&((*s)->ip6),*s)); + } + } + } } + if (toAddrs.empty()) + return; if (introduce) { std::lock_guard l(peersByVirtAddr_l); @@ -432,6 +506,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((const uint8_t *)b->second->ip6.rawIpData(),16); outp.armor((*a)->key,true); sendto(v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip6),(socklen_t)sizeof(struct sockaddr_in6)); + + outputRate.log(now,outp.size()); (*a)->lastSend = now; // Introduce destination to source (V6) @@ -443,6 +519,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((const uint8_t *)ip->rawIpData(),16); outp.armor(b->second->key,true); sendto(v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6)); + + outputRate.log(now,outp.size()); b->second->lastSend = now; } if (((*a)->ip4)&&(b->second->ip4)) { @@ -457,6 +535,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((const uint8_t *)b->second->ip4.rawIpData(),4); outp.armor((*a)->key,true); sendto(v4s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip4),(socklen_t)sizeof(struct sockaddr_in)); + + outputRate.log(now,outp.size()); (*a)->lastSend = now; // Introduce destination to source (V4) @@ -468,6 +548,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip outp.append((const uint8_t *)ip->rawIpData(),4); outp.armor(b->second->key,true); sendto(v4s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in)); + + outputRate.log(now,outp.size()); b->second->lastSend = now; } } @@ -492,6 +574,10 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip if (sendto(i->first->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))) <= 0) { printf("* write error forwarding packet to %s: %s" ZT_EOL_S,i->first->toString(ipstr),strerror(errno)); } else { + outputRate.log(now,pkt.size()); + forwardRate.log(now,pkt.size()); + if (i->second->sibling) + siblingForwardRate.log(now,pkt.size()); i->second->lastSend = now; } } @@ -571,6 +657,8 @@ int main(int argc,char **argv) signal(SIGUSR2,SIG_IGN); signal(SIGCHLD,SIG_IGN); + startTime = OSUtils::now(); + if (argc < 3) { printf("Usage: zerotier-root " ZT_EOL_S); return 1; @@ -612,17 +700,31 @@ int main(int argc,char **argv) } } - int port = ZT_DEFAULT_PORT; - int httpPort = ZT_DEFAULT_PORT; try { - port = config["port"]; - if ((port <= 0)||(port > 65535)) { - printf("FATAL: invalid port in config file %d" ZT_EOL_S,port); - return 1; + auto jport = config["port"]; + if (jport.is_array()) { + for(long i=0;i<(long)jport.size();++i) { + int port = jport[i]; + if ((port <= 0)||(port > 65535)) { + printf("FATAL: invalid port in config file %d" ZT_EOL_S,port); + return 1; + } + ports.push_back(port); + } + } else { + int port = jport; + if ((port <= 0)||(port > 65535)) { + printf("FATAL: invalid port in config file %d" ZT_EOL_S,port); + return 1; + } + ports.push_back(port); } - } catch ( ... ) { - port = ZT_DEFAULT_PORT; - } + } catch ( ... ) {} + if (ports.empty()) + ports.push_back(ZT_DEFAULT_PORT); + std::sort(ports.begin(),ports.end()); + + int httpPort = ZT_DEFAULT_PORT; try { httpPort = config["httpPort"]; if ((httpPort <= 0)||(httpPort > 65535)) { @@ -632,6 +734,7 @@ int main(int argc,char **argv) } catch ( ... ) { httpPort = ZT_DEFAULT_PORT; } + try { statsRoot = config["statsRoot"]; while ((statsRoot.length() > 0)&&(statsRoot[statsRoot.length()-1] == ZT_PATH_SEPARATOR)) @@ -641,6 +744,54 @@ int main(int argc,char **argv) } catch ( ... ) { statsRoot = ""; } + try { + auto sibs = config["siblings"]; + if (sibs.is_array()) { + for(long i=0;i<(long)siblings.size();++i) { + auto sib = sibs[i]; + if (sib.is_object()) { + std::string idStr = sib["id"]; + std::string ipStr = sib["ip"]; + Identity id; + if (!id.fromString(idStr.c_str())) { + printf("FATAL: invalid JSON while parsing siblings section in config file: invalid identity in sibling entry" ZT_EOL_S); + return 1; + } + InetAddress ip; + if (!ip.fromString(ipStr.c_str())) { + printf("FATAL: invalid JSON while parsing siblings section in config file: invalid IP address in sibling entry" ZT_EOL_S); + return 1; + } + ip.setPort((unsigned int)sib["port"]); + SharedPtr rp(new RootPeer); + rp->id = id; + if (!self.agree(id,rp->key)) { + printf("FATAL: invalid JSON while parsing siblings section in config file: invalid identity in sibling entry (unable to execute key agreement)" ZT_EOL_S); + return 1; + } + if (ip.isV4()) { + rp->ip4 = ip; + } else if (ip.isV6()) { + rp->ip6 = ip; + } else { + printf("FATAL: invalid JSON while parsing siblings section in config file: invalid IP address in sibling entry" ZT_EOL_S); + return 1; + } + rp->sibling = true; + siblings.push_back(rp); + } else { + printf("FATAL: invalid JSON while parsing siblings section in config file: sibling entry is not a JSON object" ZT_EOL_S); + return 1; + } + } + } else { + printf("FATAL: invalid JSON while parsing siblings section in config file: siblings is not a JSON array" ZT_EOL_S); + return 1; + } + } catch ( ... ) { + printf("FATAL: invalid JSON while parsing siblings section in config file: parse error" ZT_EOL_S); + return 1; + } unsigned int ncores = std::thread::hardware_concurrency(); if (ncores == 0) ncores = 1; @@ -649,78 +800,84 @@ int main(int argc,char **argv) std::vector threads; std::vector sockets; + int v4Sock = -1,v6Sock = -1; - for(unsigned int tn=0;tn 0) { - if (pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { - try { - pkt.setSize((unsigned int)pl); - handlePacket(s4,s6,reinterpret_cast(&in6),pkt); - } catch ( ... ) { - char ipstr[128]; - printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast(&in6)->toString(ipstr)); - } - } - } else { - break; - } + in6.sin6_family = AF_INET6; + in6.sin6_port = htons((uint16_t)*port); + const int s6 = bindSocket((struct sockaddr *)&in6); + if (s6 < 0) { + std::cout << "ERROR: unable to bind to port " << *port << ZT_EOL_S; + exit(1); } - })); - threads.push_back(std::thread([s6,s4]() { struct sockaddr_in in4; - Packet pkt; memset(&in4,0,sizeof(in4)); - for(;;) { - socklen_t sl = sizeof(in4); - const int pl = (int)recvfrom(s4,pkt.unsafeData(),pkt.capacity(),0,(struct sockaddr *)&in4,&sl); - if (pl > 0) { - if (pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { - try { - pkt.setSize((unsigned int)pl); - handlePacket(s4,s6,reinterpret_cast(&in4),pkt); - } catch ( ... ) { - char ipstr[128]; - printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast(&in4)->toString(ipstr)); - } - } - } else { - break; - } + in4.sin_family = AF_INET; + in4.sin_port = htons((uint16_t)*port); + const int s4 = bindSocket((struct sockaddr *)&in4); + if (s4 < 0) { + std::cout << "ERROR: unable to bind to port " << *port << ZT_EOL_S; + exit(1); } - })); + + sockets.push_back(s6); + sockets.push_back(s4); + if (v4Sock < 0) v4Sock = s4; + if (v6Sock < 0) v6Sock = s6; + + threads.push_back(std::thread([s6,s4]() { + struct sockaddr_in6 in6; + Packet pkt; + memset(&in6,0,sizeof(in6)); + for(;;) { + socklen_t sl = sizeof(in6); + const int pl = (int)recvfrom(s6,pkt.unsafeData(),pkt.capacity(),0,(struct sockaddr *)&in6,&sl); + if (pl > 0) { + if (pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { + try { + pkt.setSize((unsigned int)pl); + handlePacket(s4,s6,reinterpret_cast(&in6),pkt); + } catch ( ... ) { + char ipstr[128]; + printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast(&in6)->toString(ipstr)); + } + } + } else { + break; + } + } + })); + + threads.push_back(std::thread([s6,s4]() { + struct sockaddr_in in4; + Packet pkt; + memset(&in4,0,sizeof(in4)); + for(;;) { + socklen_t sl = sizeof(in4); + const int pl = (int)recvfrom(s4,pkt.unsafeData(),pkt.capacity(),0,(struct sockaddr *)&in4,&sl); + if (pl > 0) { + if (pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { + try { + pkt.setSize((unsigned int)pl); + handlePacket(s4,s6,reinterpret_cast(&in4),pkt); + } catch ( ... ) { + char ipstr[128]; + printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast(&in4)->toString(ipstr)); + } + } + } else { + break; + } + } + })); + } } + // Minimal local API for use with monitoring clients, etc. httplib::Server apiServ; threads.push_back(std::thread([&apiServ,httpPort]() { apiServ.Get("/",[](const httplib::Request &req,httplib::Response &res) { @@ -784,9 +941,10 @@ int main(int argc,char **argv) apiServ.listen("127.0.0.1",httpPort,0); })); - int64_t lastCleanedMulticastSubscriptions = 0; - int64_t lastCleanedPeers = 0; + // In the main thread periodically clean stuff up + int64_t lastCleaned = 0; int64_t lastWroteStats = 0; + int64_t lastPingedSiblings = 0; while (run) { //peersByIdentity_l.lock(); //peersByPhysAddr_l.lock(); @@ -797,34 +955,64 @@ int main(int argc,char **argv) const int64_t now = OSUtils::now(); - if ((now - lastCleanedMulticastSubscriptions) > 120000) { - lastCleanedMulticastSubscriptions = now; - - std::lock_guard l(multicastSubscriptions_l); - for(auto a=multicastSubscriptions.begin();a!=multicastSubscriptions.end();) { - for(auto b=a->second.begin();b!=a->second.end();) { - for(auto c=b->second.begin();c!=b->second.end();) { - if ((now - c->second) > ZT_MULTICAST_LIKE_EXPIRE) - b->second.erase(c++); - else ++c; - } - if (b->second.empty()) - a->second.erase(b++); - else ++b; + // Send HELLO to sibling roots + if ((now - lastPingedSiblings) >= ZT_PEER_PING_PERIOD) { + lastPingedSiblings = now; + std::lock_guard l(siblings_l); + for(auto s=siblings.begin();s!=siblings.end();++s) { + const InetAddress *ip = nullptr; + socklen_t sl = 0; + Packet outp((*s)->id.address(),self.address(),Packet::VERB_HELLO); + outp.append((uint8_t)ZT_PROTO_VERSION); + outp.append((uint8_t)ZEROTIER_ONE_VERSION_MAJOR); + outp.append((uint8_t)ZEROTIER_ONE_VERSION_MINOR); + outp.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION); + outp.append((uint64_t)now); + self.serialize(outp,false); + if ((*s)->ip4) { + (*s)->ip4.serialize(outp); + ip = &((*s)->ip4); + sl = sizeof(struct sockaddr_in); + } else if ((*s)->ip6) { + (*s)->ip6.serialize(outp); + ip = &((*s)->ip6); + sl = sizeof(struct sockaddr_in6); + } + if (ip) { + outp.armor((*s)->key,false); + sendto(ip->isV4() ? v4Sock : v6Sock,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,sl); } - if (a->second.empty()) - multicastSubscriptions.erase(a++); - else ++a; } } - if ((now - lastCleanedPeers) > 120000) { - lastCleanedPeers = now; + if ((now - lastCleaned) > 120000) { + lastCleaned = now; + // Old multicast subscription cleanup + { + std::lock_guard l(multicastSubscriptions_l); + for(auto a=multicastSubscriptions.begin();a!=multicastSubscriptions.end();) { + for(auto b=a->second.begin();b!=a->second.end();) { + for(auto c=b->second.begin();c!=b->second.end();) { + if ((now - c->second) > ZT_MULTICAST_LIKE_EXPIRE) + b->second.erase(c++); + else ++c; + } + if (b->second.empty()) + a->second.erase(b++); + else ++b; + } + if (a->second.empty()) + multicastSubscriptions.erase(a++); + else ++a; + } + } + + // Remove expired peers { std::lock_guard pbi_l(peersByIdentity_l); for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();) { - if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) { + if (((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT)&&(!p->second->sibling)) { std::lock_guard pbv_l(peersByVirtAddr_l); std::lock_guard pbp_l(peersByPhysAddr_l); @@ -857,6 +1045,7 @@ int main(int argc,char **argv) } } + // Remove old rendezvous tracking entries { std::lock_guard l(lastRendezvous_l); for(auto lr=lastRendezvous.begin();lr!=lastRendezvous.end();) { @@ -867,13 +1056,13 @@ int main(int argc,char **argv) } } + // Write stats if configured to do so if (((now - lastWroteStats) > 15000)&&(statsRoot.length() > 0)) { lastWroteStats = now; std::string peersFilePath(statsRoot); - peersFilePath.append("/peers.tmp"); + peersFilePath.append("/.peers.tmp"); FILE *pf = fopen(peersFilePath.c_str(),"wb"); - if (pf) { std::vector< SharedPtr > sp; { @@ -901,16 +1090,47 @@ int main(int argc,char **argv) } fprintf(pf,"%.10llx %21s %45s %5.4f %d.%d.%d" ZT_EOL_S,(unsigned long long)(*p)->id.address().toInt(),ip4,ip6,fabs((double)(now - (*p)->lastReceive) / 1000.0),(*p)->vMajor,(*p)->vMinor,(*p)->vRev); } - fclose(pf); + fclose(pf); std::string peersFilePath2(statsRoot); peersFilePath2.append("/peers"); OSUtils::rm(peersFilePath2); OSUtils::rename(peersFilePath.c_str(),peersFilePath2.c_str()); } + + std::string statsFilePath(statsRoot); + statsFilePath.append("/.stats.tmp"); + FILE *sf = fopen(statsFilePath.c_str(),"wb"); + if (sf) { + fprintf(sf,"Uptime (seconds) : %ld" ZT_EOL_S,(long)((now - startTime) / 1000)); + peersByIdentity_l.lock(); + fprintf(sf,"Peers : %llu" ZT_EOL_S,(unsigned long long)peersByIdentity.size()); + peersByVirtAddr_l.lock(); + fprintf(sf,"Virtual Address Collisions : %llu" ZT_EOL_S,(unsigned long long)(peersByIdentity.size() - peersByVirtAddr.size())); + peersByVirtAddr_l.unlock(); + peersByIdentity_l.unlock(); + peersByPhysAddr_l.lock(); + fprintf(sf,"Physical Endpoints : %llu" ZT_EOL_S,(unsigned long long)peersByPhysAddr.size()); + peersByPhysAddr_l.unlock(); + lastRendezvous_l.lock(); + fprintf(sf,"Recent P2P Graph Edges : %llu" ZT_EOL_S,(unsigned long long)lastRendezvous.size()); + lastRendezvous_l.unlock(); + fprintf(sf,"Input BPS : %.4f" ZT_EOL_S,inputRate.perSecond(now)); + fprintf(sf,"Output BPS : %.4f" ZT_EOL_S,outputRate.perSecond(now)); + fprintf(sf,"Forwarded BPS : %.4f" ZT_EOL_S,forwardRate.perSecond(now)); + fprintf(sf,"Sibling Forwarded BPS : %.4f" ZT_EOL_S,siblingForwardRate.perSecond(now)); + + fclose(sf); + std::string statsFilePath2(statsRoot); + statsFilePath2.append("/stats"); + OSUtils::rm(statsFilePath2); + OSUtils::rename(statsFilePath.c_str(),statsFilePath2.c_str()); + } } } + // If we received a kill signal, close everything and wait + // for threads to die before exiting. apiServ.stop(); for(auto s=sockets.begin();s!=sockets.end();++s) { shutdown(*s,SHUT_RDWR); diff --git a/selftest.cpp b/selftest.cpp index 0daa452a5..2d25b9a80 100644 --- a/selftest.cpp +++ b/selftest.cpp @@ -756,6 +756,24 @@ static int testOther() char buf2[4096]; char buf3[1024]; + std::cout << "[other] Testing Mutex and threads... "; std::cout.flush(); + volatile unsigned long mcnt = 0; + Mutex mlock; + std::vector mthr; + for(int t=0;t<128;++t) { + mthr.emplace_back(std::thread([&mcnt,&mlock]() { + for(int i=0;i<10000;++i) { + mlock.lock(); + ++mcnt; + mlock.unlock(); + usleep(1); + } + })); + } + for(std::vector::iterator t(mthr.begin());t!=mthr.end();++t) + t->join(); + std::cout << "OK (" << mcnt << ")" ZT_EOL_S; + std::cout << "[other] Testing bit counting functions... "; std::cout.flush(); uint32_t i32 = 0; uint64_t i64 = 0;