From 2deaaeef285896ee92dbaa14859a0530d0cf5280 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 13 Sep 2019 10:20:29 -0700 Subject: [PATCH] Some root cleanup and more lock contention reduction. --- root/root.cpp | 148 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 88 insertions(+), 60 deletions(-) diff --git a/root/root.cpp b/root/root.cpp index e24321016..645df8350 100644 --- a/root/root.cpp +++ b/root/root.cpp @@ -117,12 +117,13 @@ using json = nlohmann::json; /** * RootPeer is a normal peer known to this root * - * This can also be a sibling root, which is itself a peer. Sibling roots - * are sent HELLO while for other peers we only listen for HELLO. + * This struct must remain memcpy-able. Identity, InetAddress, and + * AtomicCounter all satisfy this. Take care when adding fields that + * this remains true. */ struct RootPeer { - ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastSync(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {} + ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {} ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); } Identity id; // Identity @@ -130,12 +131,10 @@ struct RootPeer InetAddress ip4,ip6; // IPv4 and IPv6 addresses int64_t lastSend; // Time of last send (any packet) int64_t lastReceive; // Time of last receive (any packet) - int64_t lastSync; // Time of last data synchronization with LF or other root state backend (currently unused) int64_t lastEcho; // Time of last received ECHO int64_t lastHello; // Time of last received HELLO - int vProto; // Protocol version + int vProto; // Protocol version or -1 if unknown int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown - std::mutex lock; AtomicCounter __refCount; }; @@ -180,7 +179,7 @@ struct ForwardingStats Meter bps; }; -// These fields are not locked as they're only initialized on startup +// These fields are not locked as they're only initialized on startup or are atomic static int64_t s_startTime; // Time service was started static std::vector s_ports; // Ports to bind for UDP traffic static int s_relayMaxHops = 0; // Max relay hops @@ -195,7 +194,7 @@ static std::string s_googleMapsAPIKey; // Google maps API key for GeoIP /map fea static std::map< std::pair< uint32_t,uint32_t >,std::pair< float,float > > s_geoIp4; static std::map< std::pair< std::array< uint64_t,2 >,std::array< uint64_t,2 > >,std::pair< float,float > > s_geoIp6; -// Rate meters for statistical purposes +// Rate meters for statistical purposes (locks are internal to Meter) static Meter s_inputRate; static Meter s_outputRate; static Meter s_forwardRate; @@ -221,12 +220,14 @@ static std::mutex s_lastForwardedTo_l; ////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// -static uint32_t ip4ToH32(const InetAddress &ip) +// Construct GeoIP key for IPv4 IPs +static ZT_ALWAYS_INLINE uint32_t ip4ToH32(const InetAddress &ip) { return Utils::ntoh((uint32_t)(((const struct sockaddr_in *)&ip)->sin_addr.s_addr)); } -static std::array< uint64_t,2 > ip6ToH128(const InetAddress &ip) +// Construct GeoIP key for IPv6 IPs +static ZT_ALWAYS_INLINE std::array< uint64_t,2 > ip6ToH128(const InetAddress &ip) { std::array i128; memcpy(i128.data(),ip.rawIpData(),16); @@ -268,30 +269,48 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip } } else { peer.set(new RootPeer); - if (s_self.agree(id,peer->key)) { - if (pkt.dearmor(peer->key)) { - if (!pkt.uncompress()) { - printf("%s HELLO rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr)); - return; - } - peer->id = id; - peer->lastReceive = now; - std::lock_guard pl(s_peers_l); - std::lock_guard pbi_l(s_peersByIdentity_l); - std::lock_guard pbv_l(s_peersByVirtAddr_l); - if (s_peersByIdentity.find(id) == s_peersByIdentity.end()) { // double check to ensure another thread didn't add this - s_peers.emplace_back(peer); - s_peersByIdentity.emplace(id,peer); - s_peersByVirtAddr[id.address()].emplace(peer); - } - } else { - printf("%s HELLO rejected: packet authentication failed" ZT_EOL_S,ip->toString(ipstr)); - return; - } - } else { + + if (!s_self.agree(id,peer->key)) { printf("%s HELLO rejected: key agreement failed" ZT_EOL_S,ip->toString(ipstr)); return; } + if (!pkt.dearmor(peer->key)) { + printf("%s HELLO rejected: packet authentication failed" ZT_EOL_S,ip->toString(ipstr)); + return; + } + if (!pkt.uncompress()) { + printf("%s HELLO rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr)); + return; + } + if (!id.locallyValidate()) { // this is more time consuming so check others first + printf("%s HELLO rejected: identity local validation failed" ZT_EOL_S,ip->toString(ipstr)); + return; + } + + peer->id = id; + peer->lastReceive = now; + + bool added = false; + { + std::lock_guard pbi_l(s_peersByIdentity_l); + auto existing = s_peersByIdentity.find(id); // make sure another thread didn't do this while we were + if (existing == s_peersByIdentity.end()) { + s_peersByIdentity.emplace(id,peer); + added = true; + } else { + peer = existing->second; + } + } + if (added) { + { + std::lock_guard pl(s_peers_l); + s_peers.emplace_back(peer); + } + { + std::lock_guard pbv_l(s_peersByVirtAddr_l); + s_peersByVirtAddr[id.address()].emplace(peer); + } + } } } } @@ -309,7 +328,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip return; } peer = (*p); - //printf("%s has %s (known (2))" ZT_EOL_S,ip->toString(ipstr),source().toString(astr)); break; } } @@ -319,8 +337,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip // If we found the peer, update IP and/or time and handle certain key packet types that the // root must concern itself with. if (peer) { - std::lock_guard pl(peer->lock); - if (ip->isV4()) peer->ip4 = ip; else if (ip->isV6()) @@ -531,9 +547,9 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip if (peers != s_peersByVirtAddr.end()) { for(auto p=peers->second.begin();p!=peers->second.end();++p) { if ((*p)->ip4) { - toAddrs.push_back(std::pair< InetAddress *,SharedPtr >(&((*p)->ip4),*p)); + toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr >(&((*p)->ip4),*p)); } else if ((*p)->ip6) { - toAddrs.push_back(std::pair< InetAddress *,SharedPtr >(&((*p)->ip6),*p)); + toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr >(&((*p)->ip6),*p)); } } } @@ -905,9 +921,15 @@ int main(int argc,char **argv) try { pkt.setSize((unsigned int)pl); handlePacket(s4,s6,reinterpret_cast(&in6),pkt); + } catch (std::exception &exc) { + char ipstr[128]; + printf("WARNING: unexpected exception handling packet from %s: %s" ZT_EOL_S,reinterpret_cast(&in6)->toString(ipstr),exc.what()); + } catch (int exc) { + char ipstr[128]; + printf("WARNING: unexpected exception handling packet from %s: ZT exception code %d" ZT_EOL_S,reinterpret_cast(&in6)->toString(ipstr),exc); } catch ( ... ) { char ipstr[128]; - printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast(&in6)->toString(ipstr)); + printf("WARNING: unexpected exception handling packet from %s: unknown exception" ZT_EOL_S,reinterpret_cast(&in6)->toString(ipstr)); } } } else { @@ -928,9 +950,15 @@ int main(int argc,char **argv) try { pkt.setSize((unsigned int)pl); handlePacket(s4,s6,reinterpret_cast(&in4),pkt); + } catch (std::exception &exc) { + char ipstr[128]; + printf("WARNING: unexpected exception handling packet from %s: %s" ZT_EOL_S,reinterpret_cast(&in4)->toString(ipstr),exc.what()); + } catch (int exc) { + char ipstr[128]; + printf("WARNING: unexpected exception handling packet from %s: ZT exception code %d" ZT_EOL_S,reinterpret_cast(&in4)->toString(ipstr),exc); } catch ( ... ) { char ipstr[128]; - printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast(&in4)->toString(ipstr)); + printf("WARNING: unexpected exception handling packet from %s: unknown exception" ZT_EOL_S,reinterpret_cast(&in4)->toString(ipstr)); } } } else { @@ -941,23 +969,26 @@ int main(int argc,char **argv) } } - // Minimal local API for use with monitoring clients, etc. + // A minimal read-only local API for monitoring and status queries httplib::Server apiServ; threads.push_back(std::thread([&apiServ,httpPort]() { + // Human readable status page apiServ.Get("/",[](const httplib::Request &req,httplib::Response &res) { std::ostringstream o; - std::lock_guard l0(s_peersByIdentity_l); o << "ZeroTier Root Server " << ZEROTIER_ONE_VERSION_MAJOR << '.' << ZEROTIER_ONE_VERSION_MINOR << '.' << ZEROTIER_ONE_VERSION_REVISION << ZT_EOL_S; o << "(c)2019 ZeroTier, Inc." ZT_EOL_S "Licensed under the ZeroTier BSL 1.1" ZT_EOL_S ZT_EOL_S; + s_peersByIdentity_l.lock(); o << "Peers Online: " << s_peersByIdentity.size() << ZT_EOL_S; + s_peersByIdentity_l.unlock(); res.set_content(o.str(),"text/plain"); }); + // Peer list for compatibility with software that monitors regular nodes apiServ.Get("/peer",[](const httplib::Request &req,httplib::Response &res) { char tmp[256]; std::ostringstream o; o << '['; - { + try { bool first = true; std::lock_guard l(s_peersByIdentity_l); for(auto p=s_peersByIdentity.begin();p!=s_peersByIdentity.end();++p) { @@ -997,11 +1028,12 @@ int main(int argc,char **argv) ",\"versionMinor\":" << p->second->vMinor << ",\"versionRev\":" << p->second->vRev << "}"; } - } + } catch ( ... ) {} o << ']'; res.set_content(o.str(),"application/json"); }); + // GeoIP map if enabled apiServ.Get("/map",[](const httplib::Request &req,httplib::Response &res) { char tmp[4096]; if (!s_geoInit) { @@ -1090,16 +1122,11 @@ int main(int argc,char **argv) int64_t lastCleaned = 0; int64_t lastWroteStats = 0; while (s_run) { - //s_peersByIdentity_l.lock(); - //s_peersByPhysAddr_l.lock(); - //printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)s_peersByIdentity.size(),(unsigned long)s_peersByPhysAddr.size()); - //s_peersByPhysAddr_l.unlock(); - //s_peersByIdentity_l.unlock(); sleep(1); const int64_t now = OSUtils::now(); - if ((now - lastCleaned) > 120000) { + if ((now - lastCleaned) > 300000) { lastCleaned = now; // Old multicast subscription cleanup @@ -1127,24 +1154,25 @@ int main(int argc,char **argv) std::lock_guard pbi_l(s_peers_l); for(auto p=s_peers.begin();p!=s_peers.end();) { if ((now - (*p)->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) { - std::lock_guard pbi_l(s_peersByIdentity_l); - std::lock_guard pbv_l(s_peersByVirtAddr_l); - - s_peersByIdentity.erase((*p)->id); - - auto pbv = s_peersByVirtAddr.find((*p)->id.address()); - if (pbv != s_peersByVirtAddr.end()) { - pbv->second.erase((*p)); - if (pbv->second.empty()) - s_peersByVirtAddr.erase(pbv); + { + std::lock_guard pbi_l(s_peersByIdentity_l); + s_peersByIdentity.erase((*p)->id); + } + { + std::lock_guard pbv_l(s_peersByVirtAddr_l); + auto pbv = s_peersByVirtAddr.find((*p)->id.address()); + if (pbv != s_peersByVirtAddr.end()) { + pbv->second.erase(*p); + if (pbv->second.empty()) + s_peersByVirtAddr.erase(pbv); + } } - s_peers.erase(p++); } else ++p; } } - // Remove old rendezvous and last forwarded tracking entries + // Remove old rendezvous entries { std::lock_guard l(s_rendezvousTracking_l); for(auto lr=s_rendezvousTracking.begin();lr!=s_rendezvousTracking.end();) {