Revert "Revert a change to LinuxEthernetTap threading to eliminate out of order packet issues on some systems."

This reverts commit 0461b24db3.
This commit is contained in:
Adam Ierymenko 2021-06-28 17:59:28 -04:00
parent 547b0de8a5
commit 89ddf2991b
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
2 changed files with 145 additions and 79 deletions

View file

@ -185,102 +185,148 @@ LinuxEthernetTap::LinuxEthernetTap(
(void)::pipe(_shutdownSignalPipe); (void)::pipe(_shutdownSignalPipe);
_tapReaderThread = std::thread([this]{ _thread_init_l.lock();
uint8_t b[ZT_TAP_BUF_SIZE]; for(unsigned int t=0;t<2;++t) {
fd_set readfds,nullfds; _tapReaderThread[t] = std::thread([this, t]{
int n,nfds,r; fd_set readfds,nullfds;
std::vector<void *> buffers; int n,nfds,r;
struct ifreq ifr; void *buf = nullptr;
std::vector<void *> buffers;
memset(&ifr,0,sizeof(ifr)); if (t == 0) {
strcpy(ifr.ifr_name,_dev.c_str()); struct ifreq ifr;
memset(&ifr,0,sizeof(ifr));
strcpy(ifr.ifr_name,_dev.c_str());
const int sock = socket(AF_INET,SOCK_DGRAM,0); const int sock = socket(AF_INET,SOCK_DGRAM,0);
if (sock <= 0) if (sock <= 0)
return; return;
if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) { if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) {
::close(sock); ::close(sock);
printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
return; return;
} }
ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6);
if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) {
::close(sock); ::close(sock);
printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
return; return;
} }
ifr.ifr_flags |= IFF_UP; ifr.ifr_flags |= IFF_UP;
if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) { if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) {
::close(sock); ::close(sock);
printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
return; return;
} }
// Some kernel versions seem to require you to yield while the device comes up // Some kernel versions seem to require you to yield while the device comes up
// before they will accept MTU and MAC. For others it doesn't matter, but is // before they will accept MTU and MAC. For others it doesn't matter, but is
// harmless. This was moved to the worker thread though so as not to block the // harmless. This was moved to the worker thread though so as not to block the
// main ZeroTier loop. // main ZeroTier loop.
usleep(500000); usleep(500000);
ifr.ifr_ifru.ifru_mtu = (int)_mtu; ifr.ifr_ifru.ifru_mtu = (int)_mtu;
if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) {
::close(sock); ::close(sock);
printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
return; return;
} }
fcntl(_fd,F_SETFL,O_NONBLOCK); fcntl(_fd,F_SETFL,O_NONBLOCK);
::close(sock); ::close(sock);
if (!_run) _thread_init_l.unlock();
return; } else {
_thread_init_l.lock();
_thread_init_l.unlock();
}
FD_ZERO(&readfds); if (!_run)
FD_ZERO(&nullfds); return;
nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
r = 0; FD_ZERO(&readfds);
for(;;) { FD_ZERO(&nullfds);
FD_SET(_shutdownSignalPipe[0],&readfds); nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
FD_SET(_fd,&readfds);
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) r = 0;
break; for(;;) {
FD_SET(_shutdownSignalPipe[0],&readfds);
FD_SET(_fd,&readfds);
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
if (FD_ISSET(_fd,&readfds)) { if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
for(;;) { // read until there are no more packets, then return to outer select() loop break;
n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r);
if (n > 0) {
// Some tap drivers like to send the ethernet frame and the
// payload in two chunks, so handle that by accumulating
// data until we have at least a frame.
r += n;
if (r > 14) {
if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
r = _mtu + 14;
if (_enabled) { if (FD_ISSET(_fd,&readfds)) {
//_tapq.post(std::pair<void *,int>(buf,r)); for(;;) { // read until there are no more packets, then return to outer select() loop
//buf = nullptr; if (!buf) {
MAC to(b, 6),from(b + 6, 6); // To reduce use of the mutex, we keep a local buffer vector and
unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]); // swap (which is a pointer swap) with the global one when it's
_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14)); // empty. This retrieves a batch of buffers to use.
if (buffers.empty()) {
std::lock_guard<std::mutex> l(_buffers_l);
buffers.swap(_buffers);
}
if (buffers.empty()) {
buf = malloc(ZT_TAP_BUF_SIZE);
if (!buf)
break;
} else {
buf = buffers.back();
buffers.pop_back();
} }
r = 0;
} }
} else {
r = 0; n = (int)::read(_fd,reinterpret_cast<uint8_t *>(buf) + r,ZT_TAP_BUF_SIZE - r);
break;
if (n > 0) {
// Some tap drivers like to send the ethernet frame and the
// payload in two chunks, so handle that by accumulating
// data until we have at least a frame.
r += n;
if (r > 14) {
if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
r = _mtu + 14;
if (_enabled) {
_tapq.post(std::pair<void *,int>(buf,r));
buf = nullptr;
}
r = 0;
}
} else {
r = 0;
break;
}
} }
} }
} }
});
}
_tapProcessorThread = std::thread([this] {
MAC to,from;
std::pair<void *,int> qi;
while (_tapq.get(qi)) {
uint8_t *const b = reinterpret_cast<uint8_t *>(qi.first);
if (b) {
to.setTo(b, 6);
from.setTo(b + 6, 6);
unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]);
_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(qi.second - 14));
{
std::lock_guard<std::mutex> l(_buffers_l);
if (_buffers.size() < 128)
_buffers.push_back(qi.first);
else free(qi.first);
}
} else break;
} }
}); });
} }
@ -288,11 +334,25 @@ LinuxEthernetTap::LinuxEthernetTap(
LinuxEthernetTap::~LinuxEthernetTap() LinuxEthernetTap::~LinuxEthernetTap()
{ {
_run = false; _run = false;
(void)::write(_shutdownSignalPipe[1],"\0",1);
_tapReaderThread.join(); (void)::write(_shutdownSignalPipe[1],"\0",1); // causes reader thread(s) to exit
_tapq.post(std::pair<void *,int>(nullptr,0)); // causes processor thread to exit
_tapReaderThread[0].join();
_tapReaderThread[1].join();
_tapProcessorThread.join();
::close(_fd); ::close(_fd);
::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[0]);
::close(_shutdownSignalPipe[1]); ::close(_shutdownSignalPipe[1]);
for(std::vector<void *>::iterator i(_buffers.begin());i!=_buffers.end();++i)
free(*i);
std::vector< std::pair<void *,int> > dv(_tapq.drain());
for(std::vector< std::pair<void *,int> >::iterator i(dv.begin());i!=dv.end();++i) {
if (i->first)
free(i->first);
}
} }
void LinuxEthernetTap::setEnabled(bool en) void LinuxEthernetTap::setEnabled(bool en)

View file

@ -26,6 +26,7 @@
#include <mutex> #include <mutex>
#include "../node/MulticastGroup.hpp" #include "../node/MulticastGroup.hpp"
#include "EthernetTap.hpp" #include "EthernetTap.hpp"
#include "BlockingQueue.hpp"
namespace ZeroTier { namespace ZeroTier {
@ -70,7 +71,12 @@ private:
int _shutdownSignalPipe[2]; int _shutdownSignalPipe[2];
std::atomic_bool _enabled; std::atomic_bool _enabled;
std::atomic_bool _run; std::atomic_bool _run;
std::thread _tapReaderThread; std::thread _tapReaderThread[2];
std::thread _tapProcessorThread;
std::mutex _buffers_l;
std::mutex _thread_init_l;
std::vector<void *> _buffers;
BlockingQueue< std::pair<void *,int> > _tapq;
}; };
} // namespace ZeroTier } // namespace ZeroTier