diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index acf972682..17f2b0df9 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -185,102 +185,148 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); - _tapReaderThread = std::thread([this]{ - uint8_t b[ZT_TAP_BUF_SIZE]; - fd_set readfds,nullfds; - int n,nfds,r; - std::vector buffers; - struct ifreq ifr; + _thread_init_l.lock(); + for(unsigned int t=0;t<2;++t) { + _tapReaderThread[t] = std::thread([this, t]{ + fd_set readfds,nullfds; + int n,nfds,r; + void *buf = nullptr; + std::vector buffers; - memset(&ifr,0,sizeof(ifr)); - strcpy(ifr.ifr_name,_dev.c_str()); + if (t == 0) { + struct ifreq ifr; + memset(&ifr,0,sizeof(ifr)); + strcpy(ifr.ifr_name,_dev.c_str()); - const int sock = socket(AF_INET,SOCK_DGRAM,0); - if (sock <= 0) - return; + const int sock = socket(AF_INET,SOCK_DGRAM,0); + if (sock <= 0) + return; - if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; - } + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); + if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } - ifr.ifr_flags |= IFF_UP; - if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + ifr.ifr_flags |= IFF_UP; + if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - // Some kernel versions seem to require you to yield while the device comes up - // before they will accept MTU and MAC. For others it doesn't matter, but is - // harmless. This was moved to the worker thread though so as not to block the - // main ZeroTier loop. - usleep(500000); + // Some kernel versions seem to require you to yield while the device comes up + // before they will accept MTU and MAC. For others it doesn't matter, but is + // harmless. This was moved to the worker thread though so as not to block the + // main ZeroTier loop. + usleep(500000); - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); - return; - } + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } - fcntl(_fd,F_SETFL,O_NONBLOCK); + fcntl(_fd,F_SETFL,O_NONBLOCK); - ::close(sock); + ::close(sock); - if (!_run) - return; + _thread_init_l.unlock(); + } else { + _thread_init_l.lock(); + _thread_init_l.unlock(); + } - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + if (!_run) + return; - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) - break; + r = 0; + for(;;) { + FD_SET(_shutdownSignalPipe[0],&readfds); + FD_SET(_fd,&readfds); + select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); - if (FD_ISSET(_fd,&readfds)) { - for(;;) { // read until there are no more packets, then return to outer select() loop - n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r); - if (n > 0) { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; + if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread + break; - if (_enabled) { - //_tapq.post(std::pair(buf,r)); - //buf = nullptr; - MAC to(b, 6),from(b + 6, 6); - unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]); - _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14)); + if (FD_ISSET(_fd,&readfds)) { + for(;;) { // read until there are no more packets, then return to outer select() loop + if (!buf) { + // To reduce use of the mutex, we keep a local buffer vector and + // swap (which is a pointer swap) with the global one when it's + // empty. This retrieves a batch of buffers to use. + if (buffers.empty()) { + std::lock_guard l(_buffers_l); + buffers.swap(_buffers); + } + if (buffers.empty()) { + buf = malloc(ZT_TAP_BUF_SIZE); + if (!buf) + break; + } else { + buf = buffers.back(); + buffers.pop_back(); } - - r = 0; } - } else { - r = 0; - break; + + n = (int)::read(_fd,reinterpret_cast(buf) + r,ZT_TAP_BUF_SIZE - r); + + if (n > 0) { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; + + if (_enabled) { + _tapq.post(std::pair(buf,r)); + buf = nullptr; + } + + r = 0; + } + } else { + r = 0; + break; + } } } } + }); + } + + _tapProcessorThread = std::thread([this] { + MAC to,from; + std::pair qi; + while (_tapq.get(qi)) { + uint8_t *const b = reinterpret_cast(qi.first); + if (b) { + to.setTo(b, 6); + from.setTo(b + 6, 6); + unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]); + _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(qi.second - 14)); + { + std::lock_guard l(_buffers_l); + if (_buffers.size() < 128) + _buffers.push_back(qi.first); + else free(qi.first); + } + } else break; } }); } @@ -288,11 +334,25 @@ LinuxEthernetTap::LinuxEthernetTap( LinuxEthernetTap::~LinuxEthernetTap() { _run = false; - (void)::write(_shutdownSignalPipe[1],"\0",1); - _tapReaderThread.join(); + + (void)::write(_shutdownSignalPipe[1],"\0",1); // causes reader thread(s) to exit + _tapq.post(std::pair(nullptr,0)); // causes processor thread to exit + + _tapReaderThread[0].join(); + _tapReaderThread[1].join(); + _tapProcessorThread.join(); + ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); + + for(std::vector::iterator i(_buffers.begin());i!=_buffers.end();++i) + free(*i); + std::vector< std::pair > dv(_tapq.drain()); + for(std::vector< std::pair >::iterator i(dv.begin());i!=dv.end();++i) { + if (i->first) + free(i->first); + } } void LinuxEthernetTap::setEnabled(bool en) diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index 3603740d4..e9696fa60 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -26,6 +26,7 @@ #include #include "../node/MulticastGroup.hpp" #include "EthernetTap.hpp" +#include "BlockingQueue.hpp" namespace ZeroTier { @@ -70,7 +71,12 @@ private: int _shutdownSignalPipe[2]; std::atomic_bool _enabled; std::atomic_bool _run; - std::thread _tapReaderThread; + std::thread _tapReaderThread[2]; + std::thread _tapProcessorThread; + std::mutex _buffers_l; + std::mutex _thread_init_l; + std::vector _buffers; + BlockingQueue< std::pair > _tapq; }; } // namespace ZeroTier