From 3b6b1d167427b3d3873070bb111f18db5dedac1a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 13 Nov 2018 09:35:20 -0800 Subject: [PATCH 1/7] Make incoming packet processor thread pool dynamic based on core count. --- service/OneService.cpp | 71 ++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/service/OneService.cpp b/service/OneService.cpp index effe90c20..ad5680c2c 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -174,8 +174,8 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // TCP activity timeout #define ZT_TCP_ACTIVITY_TIMEOUT 60000 -// Number of receive path threads to start -#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8 +// Max number of packet handler threads to start +#define ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE 16 #if ZT_VAULT_SUPPORT size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data) @@ -465,7 +465,7 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + unsigned int _incomingPacketThreadPoolSize; struct { uint8_t data[2048]; std::thread thr; @@ -474,8 +474,7 @@ public: int size; std::condition_variable cond; std::mutex lock; - } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE]; -#endif + } _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE]; // Local configuration and memo-ized information from it json _localConfig; @@ -606,8 +605,8 @@ public: _ports[1] = 0; _ports[2] = 0; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn l(_incomingPacketWorker[tn].lock); for(;;) { @@ -636,7 +635,6 @@ public: } }); } -#endif #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -645,17 +643,15 @@ public: virtual ~OneServiceImpl() { -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - unsigned long cksum = 0; - for(unsigned int i=0;isa_family) { + case AF_INET: + for(unsigned int i=0;i<4;++i) + cksum += ((const uint8_t *)(&(((const struct sockaddr_in *)from)->sin_addr.s_addr)))[i]; + break; + case AF_INET6: + for(unsigned int i=0;i<16;++i) + cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i]; + break; + default: + for(unsigned int i=0;i(sock); - memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); + ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); _incomingPacketWorker[tn].size = (int)len; _incomingPacketWorker[tn].lock.unlock(); _incomingPacketWorker[tn].cond.notify_all(); -#else - const ZT_ResultCode rc = _node->processWirePacket( - (void *)0, - OSUtils::now(), - reinterpret_cast(sock), - reinterpret_cast(from), // Phy<> uses sockaddr_storage, so it'll always be that big - data, - len, - &_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - } -#endif } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) From f6450cd7e14bc15ea7361a4f9e8e9a09fa15228d Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 13 Nov 2018 10:19:51 -0800 Subject: [PATCH 2/7] Cleanup and a minor performance improvement. --- service/OneService.cpp | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/service/OneService.cpp b/service/OneService.cpp index ad5680c2c..796580af5 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -468,12 +468,13 @@ public: unsigned int _incomingPacketThreadPoolSize; struct { uint8_t data[2048]; - std::thread thr; + uint64_t now; int64_t sock; struct sockaddr_storage from; int size; std::condition_variable cond; std::mutex lock; + std::thread thr; } _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE]; // Local configuration and memo-ized information from it @@ -607,21 +608,16 @@ public: _incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1); for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) { - _incomingPacketWorker[tn].thr = std::thread([this,tn]() { - std::unique_lock l(_incomingPacketWorker[tn].lock); + const unsigned int tno = tn; + _incomingPacketWorker[tn].thr = std::thread([this,tno]() { + std::unique_lock l(_incomingPacketWorker[tno].lock); for(;;) { - _incomingPacketWorker[tn].cond.wait(l); - if (_incomingPacketWorker[tn].size < 0) { + _incomingPacketWorker[tno].cond.wait(l); + const int s = _incomingPacketWorker[tno].size; + if (s < 0) { break; - } else if (_incomingPacketWorker[tn].size > 0) { - const ZT_ResultCode rc = _node->processWirePacket( - (void *)0, - OSUtils::now(), - _incomingPacketWorker[tn].sock, - &(_incomingPacketWorker[tn].from), - _incomingPacketWorker[tn].data, - (unsigned int)_incomingPacketWorker[tn].size, - &_nextBackgroundTaskDeadline); + } else if (s > 0) { + const ZT_ResultCode rc = _node->processWirePacket(nullptr,_incomingPacketWorker[tno].now,_incomingPacketWorker[tno].sock,&(_incomingPacketWorker[tno].from),_incomingPacketWorker[tno].data,(unsigned int)s,&_nextBackgroundTaskDeadline); if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); @@ -1896,8 +1892,9 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len) { + const uint64_t now = OSUtils::now(); if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) - _lastDirectReceiveFromGlobal = OSUtils::now(); + _lastDirectReceiveFromGlobal = now; /* Pick worker thread by checksumming the from address. This avoids thread * scheduling caused packet re-ordering by binding each individual remote @@ -1916,20 +1913,17 @@ public: for(unsigned int i=0;i<16;++i) cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i]; break; - default: - for(unsigned int i=0;i(sock); ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); _incomingPacketWorker[tn].size = (int)len; + _incomingPacketWorker[tn].cond.notify_one(); _incomingPacketWorker[tn].lock.unlock(); - _incomingPacketWorker[tn].cond.notify_all(); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) From 90631adb9beda188bd00494f1a84446c0b94d0f9 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 13 Nov 2018 12:07:58 -0800 Subject: [PATCH 3/7] Improve multithreading support for OneService (faster, dynamic adjustment of thread count based on HW concurrency). --- one.cpp | 2 + osdep/BlockingQueue.hpp | 23 ++++++- service/OneService.cpp | 133 +++++++++++++++++++--------------------- 3 files changed, 87 insertions(+), 71 deletions(-) diff --git a/one.cpp b/one.cpp index b48e4396c..7ee818afe 100644 --- a/one.cpp +++ b/one.cpp @@ -1354,12 +1354,14 @@ int main(int argc,char **argv) #ifdef __UNIX_LIKE__ signal(SIGHUP,&_sighandlerHup); signal(SIGPIPE,SIG_IGN); + signal(SIGIO,SIG_IGN); signal(SIGUSR1,SIG_IGN); signal(SIGUSR2,SIG_IGN); signal(SIGALRM,SIG_IGN); signal(SIGINT,&_sighandlerQuit); signal(SIGTERM,&_sighandlerQuit); signal(SIGQUIT,&_sighandlerQuit); + signal(SIGINT,&_sighandlerQuit); /* Ensure that there are no inherited file descriptors open from a previous * incarnation. This is a hack to ensure that GitHub issue #61 or variants diff --git a/osdep/BlockingQueue.hpp b/osdep/BlockingQueue.hpp index 351a095a7..03986efe9 100644 --- a/osdep/BlockingQueue.hpp +++ b/osdep/BlockingQueue.hpp @@ -32,6 +32,8 @@ #include #include +#include "Thread.hpp" + namespace ZeroTier { /** @@ -52,6 +54,23 @@ public: c.notify_one(); } + inline void postWait(T t,unsigned long maxQueueSize) + { + for(;;) { + { + std::lock_guard lock(m); + if (q.size() < maxQueueSize) { + q.push(t); + c.notify_one(); + return; + } + } + if (!r) + break; + Thread::sleep(1); + } + } + inline void stop(void) { std::lock_guard lock(m); @@ -98,8 +117,8 @@ public: private: volatile bool r; std::queue q; - std::mutex m; - std::condition_variable c; + mutable std::mutex m; + mutable std::condition_variable c; }; } // namespace ZeroTier diff --git a/service/OneService.cpp b/service/OneService.cpp index 796580af5..bf24466dd 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -60,6 +60,7 @@ #include "../osdep/PortMapper.hpp" #include "../osdep/Binder.hpp" #include "../osdep/ManagedRoute.hpp" +#include "../osdep/BlockingQueue.hpp" #include "OneService.hpp" #include "SoftwareUpdater.hpp" @@ -174,9 +175,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // TCP activity timeout #define ZT_TCP_ACTIVITY_TIMEOUT 60000 -// Max number of packet handler threads to start -#define ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE 16 - #if ZT_VAULT_SUPPORT size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data) { @@ -440,6 +438,15 @@ struct TcpConnection Mutex writeq_m; }; +struct OneServiceIncomingPacket +{ + uint64_t now; + int64_t sock; + struct sockaddr_storage from; + unsigned int size; + uint8_t data[ZT_MAX_MTU]; +}; + class OneServiceImpl : public OneService { public: @@ -465,17 +472,10 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; - unsigned int _incomingPacketThreadPoolSize; - struct { - uint8_t data[2048]; - uint64_t now; - int64_t sock; - struct sockaddr_storage from; - int size; - std::condition_variable cond; - std::mutex lock; - std::thread thr; - } _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE]; + std::vector _incomingPacketMemoryPool; + BlockingQueue _incomingPacketQueue; + std::vector _incomingPacketThreads; + Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock; // Local configuration and memo-ized information from it json _localConfig; @@ -606,30 +606,31 @@ public: _ports[1] = 0; _ports[2] = 0; - _incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1); - for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) { - const unsigned int tno = tn; - _incomingPacketWorker[tn].thr = std::thread([this,tno]() { - std::unique_lock l(_incomingPacketWorker[tno].lock); + for(long t=0;tprocessWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_incomingPacketMemoryPoolLock); + _incomingPacketMemoryPool.push_back(pkt); + } + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); break; - } else if (s > 0) { - const ZT_ResultCode rc = _node->processWirePacket(nullptr,_incomingPacketWorker[tno].now,_incomingPacketWorker[tno].sock,&(_incomingPacketWorker[tno].from),_incomingPacketWorker[tno].data,(unsigned int)s,&_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; - } } } - }); + })); } #if ZT_VAULT_SUPPORT @@ -639,22 +640,27 @@ public: virtual ~OneServiceImpl() { - for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) { - _incomingPacketWorker[tn].lock.lock(); - _incomingPacketWorker[tn].size = -1; - _incomingPacketWorker[tn].lock.unlock(); - _incomingPacketWorker[tn].cond.notify_all(); - } - for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) { - _incomingPacketWorker[tn].thr.join(); - } + _incomingPacketQueue.stop(); + _incomingPacketThreadsLock.lock(); + for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t) + t->join(); + _incomingPacketThreadsLock.unlock(); + _binder.closeAll(_phy); _phy.close(_localControlSocket4); _phy.close(_localControlSocket6); + #if ZT_VAULT_SUPPORT curl_global_cleanup(); #endif + _incomingPacketMemoryPoolLock.lock(); + while (!_incomingPacketMemoryPool.empty()) { + delete _incomingPacketMemoryPool.back(); + _incomingPacketMemoryPool.pop_back(); + } + _incomingPacketMemoryPoolLock.unlock(); + #ifdef ZT_USE_MINIUPNPC delete _portMapper; #endif @@ -1896,34 +1902,23 @@ public: if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = now; - /* Pick worker thread by checksumming the from address. This avoids thread - * scheduling caused packet re-ordering by binding each individual remote - * peer to a specific thread. It will block globally if that thread is blocked, - * so this is not an optimal implementation from the perspective of perfect - * thread utilization. Nevertheless using threads this way does greatly - * improve performance in heavy load multi-peer scenarios and does so with - * little impact on simpler scenarios due to its extreme simplicity. */ - uint8_t cksum = 0; - switch(from->sa_family) { - case AF_INET: - for(unsigned int i=0;i<4;++i) - cksum += ((const uint8_t *)(&(((const struct sockaddr_in *)from)->sin_addr.s_addr)))[i]; - break; - case AF_INET6: - for(unsigned int i=0;i<16;++i) - cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i]; - break; + OneServiceIncomingPacket *pkt; + _incomingPacketMemoryPoolLock.lock(); + if (_incomingPacketMemoryPool.empty()) { + pkt = new OneServiceIncomingPacket; + } else { + pkt = _incomingPacketMemoryPool.back(); + _incomingPacketMemoryPool.pop_back(); } - const unsigned int tn = cksum % _incomingPacketThreadPoolSize; + _incomingPacketMemoryPoolLock.unlock(); - _incomingPacketWorker[tn].lock.lock(); - ZT_FAST_MEMCPY(_incomingPacketWorker[tn].data,data,len); - _incomingPacketWorker[tn].now = now; - _incomingPacketWorker[tn].sock = reinterpret_cast(sock); - ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); - _incomingPacketWorker[tn].size = (int)len; - _incomingPacketWorker[tn].cond.notify_one(); - _incomingPacketWorker[tn].lock.unlock(); + pkt->now = now; + pkt->sock = reinterpret_cast(sock); + ZT_FAST_MEMCPY(&(pkt->from),from,sizeof(struct sockaddr_storage)); + pkt->size = (unsigned int)len; + ZT_FAST_MEMCPY(pkt->data,data,len); + + _incomingPacketQueue.postWait(pkt,64); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) From 350116c513a8d6ea48842245b1d85287b834fbf5 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 13 Nov 2018 12:42:03 -0800 Subject: [PATCH 4/7] Stability improvements for Mac virtual Ethernet tap driver. --- osdep/MacEthernetTap.cpp | 11 +++++++---- osdep/MacEthernetTapAgent.c | 11 ++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp index 92df2f312..1cfb93757 100644 --- a/osdep/MacEthernetTap.cpp +++ b/osdep/MacEthernetTap.cpp @@ -147,7 +147,7 @@ MacEthernetTap::MacEthernetTap( _agentStdin2 = agentStdin[0]; _agentStdout2 = agentStdout[1]; _agentStderr2 = agentStderr[1]; - long apid = (long)vfork(); + long apid = (long)fork(); if (apid < 0) { throw std::runtime_error("fork failed"); } else if (apid == 0) { @@ -155,10 +155,13 @@ MacEthernetTap::MacEthernetTap( ::dup2(agentStdout[1],STDOUT_FILENO); ::dup2(agentStderr[1],STDERR_FILENO); ::close(agentStdin[0]); + ::close(agentStdin[1]); + ::close(agentStdout[0]); ::close(agentStdout[1]); + ::close(agentStderr[0]); ::close(agentStderr[1]); ::execl(agentPath.c_str(),agentPath.c_str(),devnostr,ethaddr,mtustr,metricstr,(char *)0); - ::exit(-1); + ::_exit(-1); } else { _agentPid = apid; } @@ -356,8 +359,8 @@ void MacEthernetTap::threadMain() const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1; long agentReadPtr = 0; - fcntl(_agentStdout,F_SETFL,O_NONBLOCK); - fcntl(_agentStderr,F_SETFL,O_NONBLOCK); + fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); + fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); FD_ZERO(&readfds); FD_ZERO(&nullfds); diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index ca1f7a4ec..7eaac8711 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -104,8 +104,8 @@ #define P_IFCONFIG "/sbin/ifconfig" -static unsigned char s_pktReadBuf[262144] __attribute__ ((__aligned__(16))); -static unsigned char s_stdinReadBuf[262144] __attribute__ ((__aligned__(16))); +static unsigned char s_pktReadBuf[524288] __attribute__ ((__aligned__(16))); +static unsigned char s_stdinReadBuf[524288] __attribute__ ((__aligned__(16))); static char s_deviceName[IFNAMSIZ]; static char s_peerDeviceName[IFNAMSIZ]; static int s_bpffd = -1; @@ -175,7 +175,7 @@ static int run(const char *path,...) } else if (pid == 0) { dup2(STDERR_FILENO,STDOUT_FILENO); execv(args[0],args); - exit(-1); + _exit(-1); } int rv = 0; waitpid(pid,&rv,0); @@ -322,7 +322,6 @@ int main(int argc,char **argv) return ZT_MACETHERNETTAPAGENT_EXIT_CODE_UNABLE_TO_CREATE; } - fcntl(STDIN_FILENO,F_SETFL,fcntl(STDIN_FILENO,F_GETFL)|O_NONBLOCK); fcntl(s_ndrvfd,F_SETFL,fcntl(s_ndrvfd,F_GETFL)|O_NONBLOCK); fcntl(s_bpffd,F_SETFL,fcntl(s_bpffd,F_GETFL)|O_NONBLOCK); @@ -381,6 +380,7 @@ int main(int argc,char **argv) } } break; + case ZT_MACETHERNETTAPAGENT_STDIN_CMD_IFCONFIG: { char *args[16]; args[0] = P_IFCONFIG; @@ -410,12 +410,13 @@ int main(int argc,char **argv) } else if (pid == 0) { dup2(STDERR_FILENO,STDOUT_FILENO); execv(args[0],args); - exit(-1); + _exit(-1); } int rv = 0; waitpid(pid,&rv,0); } } break; + case ZT_MACETHERNETTAPAGENT_STDIN_CMD_EXIT: return ZT_MACETHERNETTAPAGENT_EXIT_CODE_SUCCESS; } From 4ed7d20a4880a168a447eb0b5d02b3c643ab0cff Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 13 Nov 2018 12:46:36 -0800 Subject: [PATCH 5/7] Another stability fix... turns out vfork is problematic here. --- osdep/MacEthernetTapAgent.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index 7eaac8711..91eff25cc 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -404,7 +404,7 @@ int main(int argc,char **argv) } args[argNo] = (char *)0; if (argNo > 2) { - pid_t pid = vfork(); + pid_t pid = fork(); if (pid < 0) { return -1; } else if (pid == 0) { From 6684559cd9b8a96da29c7a59c8d787918a4171f9 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 13 Nov 2018 13:51:25 -0800 Subject: [PATCH 6/7] More Mac tap improvements and threading efficiency improvements. --- osdep/BlockingQueue.hpp | 26 +++++++++++++------------- osdep/MacEthernetTap.cpp | 4 ++-- osdep/MacEthernetTap.hpp | 2 ++ osdep/MacEthernetTapAgent.c | 11 ++--------- service/OneService.cpp | 6 ++++-- 5 files changed, 23 insertions(+), 26 deletions(-) diff --git a/osdep/BlockingQueue.hpp b/osdep/BlockingQueue.hpp index 03986efe9..6f81ddf2d 100644 --- a/osdep/BlockingQueue.hpp +++ b/osdep/BlockingQueue.hpp @@ -54,20 +54,16 @@ public: c.notify_one(); } - inline void postWait(T t,unsigned long maxQueueSize) + inline void postLimit(T t,const unsigned long limit) { + std::unique_lock lock(m); for(;;) { - { - std::lock_guard lock(m); - if (q.size() < maxQueueSize) { - q.push(t); - c.notify_one(); - return; - } - } - if (!r) + if (q.size() < limit) { + q.push(t); + c.notify_one(); break; - Thread::sleep(1); + } + gc.wait(lock); } } @@ -84,10 +80,14 @@ public: if (!r) return false; while (q.empty()) { c.wait(lock); - if (!r) return false; + if (!r) { + gc.notify_all(); + return false; + } } value = q.front(); q.pop(); + gc.notify_all(); return true; } @@ -118,7 +118,7 @@ private: volatile bool r; std::queue q; mutable std::mutex m; - mutable std::condition_variable c; + mutable std::condition_variable c,gc; }; } // namespace ZeroTier diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp index 1cfb93757..fb3e3a75b 100644 --- a/osdep/MacEthernetTap.cpp +++ b/osdep/MacEthernetTap.cpp @@ -287,7 +287,9 @@ void MacEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,co iov[1].iov_len = 15; iov[2].iov_base = const_cast(data); iov[2].iov_len = len; + _putLock.lock(); writev(_agentStdin,iov,3); + _putLock.unlock(); } } @@ -396,8 +398,6 @@ void MacEthernetTap::threadMain() break; } } - } else { - break; } } if (FD_ISSET(_agentStderr,&readfds)) { diff --git a/osdep/MacEthernetTap.hpp b/osdep/MacEthernetTap.hpp index 4b3ac019a..eaf131a7a 100644 --- a/osdep/MacEthernetTap.hpp +++ b/osdep/MacEthernetTap.hpp @@ -38,6 +38,7 @@ #include "../node/MAC.hpp" #include "../node/InetAddress.hpp" #include "../node/MulticastGroup.hpp" +#include "../node/Mutex.hpp" #include "Thread.hpp" @@ -80,6 +81,7 @@ private: std::string _homePath; std::string _dev; std::vector _multicastGroups; + Mutex _putLock; unsigned int _mtu; unsigned int _metric; int _shutdownSignalPipe[2]; diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index 91eff25cc..a595e154f 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -104,8 +104,8 @@ #define P_IFCONFIG "/sbin/ifconfig" -static unsigned char s_pktReadBuf[524288] __attribute__ ((__aligned__(16))); -static unsigned char s_stdinReadBuf[524288] __attribute__ ((__aligned__(16))); +static unsigned char s_pktReadBuf[262144] __attribute__ ((__aligned__(16))); +static unsigned char s_stdinReadBuf[262144] __attribute__ ((__aligned__(16))); static char s_deviceName[IFNAMSIZ]; static char s_peerDeviceName[IFNAMSIZ]; static int s_bpffd = -1; @@ -322,9 +322,6 @@ int main(int argc,char **argv) return ZT_MACETHERNETTAPAGENT_EXIT_CODE_UNABLE_TO_CREATE; } - fcntl(s_ndrvfd,F_SETFL,fcntl(s_ndrvfd,F_GETFL)|O_NONBLOCK); - fcntl(s_bpffd,F_SETFL,fcntl(s_bpffd,F_GETFL)|O_NONBLOCK); - fprintf(stderr,"I %s %s %d.%d.%d.%d\n",s_deviceName,s_peerDeviceName,ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION,ZEROTIER_ONE_VERSION_BUILD); FD_ZERO(&rfds); @@ -357,8 +354,6 @@ int main(int argc,char **argv) } p += BPF_WORDALIGN(h->bh_hdrlen + h->bh_caplen); } - } else { - return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR; } } @@ -431,8 +426,6 @@ int main(int argc,char **argv) break; } } - } else { - return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR; } } } diff --git a/service/OneService.cpp b/service/OneService.cpp index bf24466dd..1351cbfb1 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -472,6 +472,7 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; + unsigned long _incomingPacketConcurrency; std::vector _incomingPacketMemoryPool; BlockingQueue _incomingPacketQueue; std::vector _incomingPacketThreads; @@ -606,7 +607,8 @@ public: _ports[1] = 0; _ports[2] = 0; - for(long t=0;tsize = (unsigned int)len; ZT_FAST_MEMCPY(pkt->data,data,len); - _incomingPacketQueue.postWait(pkt,64); + _incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) From 690bd933d52c7dbbcddde7c0aff30f7fee91a6d9 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 13 Nov 2018 13:52:45 -0800 Subject: [PATCH 7/7] Support shutdown with postLimit in BlockingQueue --- osdep/BlockingQueue.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/osdep/BlockingQueue.hpp b/osdep/BlockingQueue.hpp index 6f81ddf2d..9e2f73cb7 100644 --- a/osdep/BlockingQueue.hpp +++ b/osdep/BlockingQueue.hpp @@ -63,6 +63,8 @@ public: c.notify_one(); break; } + if (!r) + break; gc.wait(lock); } } @@ -72,6 +74,7 @@ public: std::lock_guard lock(m); r = false; c.notify_all(); + gc.notify_all(); } inline bool get(T &value)