Fix build for macOS, tune to prevent packet re-ordering

This commit is contained in:
Joseph Henry 2024-07-03 08:49:07 -07:00
parent 683d332abc
commit 64634c916c
No known key found for this signature in database
GPG key ID: 3C2C8A1EB4269827
7 changed files with 102 additions and 96 deletions

View file

@ -84,9 +84,9 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
// The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions // The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions
// (Sierra and earlier) must use the a kernel extension. // (Sierra and earlier) must use the a kernel extension.
if (strtol(osrelease,(char **)0,10) < 17) { if (strtol(osrelease,(char **)0,10) < 17) {
return std::shared_ptr<EthernetTap>(new MacKextEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); return std::shared_ptr<EthernetTap>(new MacKextEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
} else { } else {
return std::shared_ptr<EthernetTap>(new MacEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); return std::shared_ptr<EthernetTap>(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
} }
} }
} }

View file

@ -223,7 +223,7 @@ LinuxEthernetTap::LinuxEthernetTap(
(void)::pipe(_shutdownSignalPipe); (void)::pipe(_shutdownSignalPipe);
bool _enablePinning = false; bool _enablePinning = false;
char* envvar = std::getenv("ZT_CPU_PINNING"); char* envvar = std::getenv("ZT_CORE_PINNING");
if (envvar) { if (envvar) {
int tmp = atoi(envvar); int tmp = atoi(envvar);
if (tmp > 0) { if (tmp > 0) {

View file

@ -69,7 +69,6 @@ static bool fethMaxMtuAdjusted = false;
MacEthernetTap::MacEthernetTap( MacEthernetTap::MacEthernetTap(
const char *homePath, const char *homePath,
unsigned int concurrency,
const MAC &mac, const MAC &mac,
unsigned int mtu, unsigned int mtu,
unsigned int metric, unsigned int metric,
@ -78,7 +77,6 @@ MacEthernetTap::MacEthernetTap(
void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *data,unsigned int len), void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *data,unsigned int len),
void *arg) : void *arg) :
_handler(handler), _handler(handler),
_concurrency(concurrency),
_arg(arg), _arg(arg),
_nwid(nwid), _nwid(nwid),
_homePath(homePath), _homePath(homePath),
@ -288,9 +286,6 @@ MacEthernetTap::~MacEthernetTap()
} }
Thread::join(_thread); Thread::join(_thread);
for (std::thread &t : _rxThreads) {
t.join();
}
} }
void MacEthernetTap::setEnabled(bool en) { _enabled = en; } void MacEthernetTap::setEnabled(bool en) { _enabled = en; }
@ -479,25 +474,17 @@ void MacEthernetTap::setMtu(unsigned int mtu)
void MacEthernetTap::threadMain() void MacEthernetTap::threadMain()
throw() throw()
{ {
Thread::sleep(250);
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxThreads.push_back(std::thread([this, i] {
fprintf(stderr, "starting thread %d\n", i);
char agentReadBuf[ZT_MACETHERNETTAP_AGENT_READ_BUF_SIZE]; char agentReadBuf[ZT_MACETHERNETTAP_AGENT_READ_BUF_SIZE];
char agentStderrBuf[256]; char agentStderrBuf[256];
fd_set readfds,nullfds; fd_set readfds,nullfds;
MAC to,from; MAC to,from;
Thread::sleep(250);
const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1; const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1;
long agentReadPtr = 0; long agentReadPtr = 0;
fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK);
if (i == 0) { fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK);
fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK);
fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK);
}
FD_ZERO(&readfds); FD_ZERO(&readfds);
FD_ZERO(&nullfds); FD_ZERO(&nullfds);
@ -546,7 +533,6 @@ void MacEthernetTap::threadMain()
*/ */
} }
} }
}));}
::close(_agentStdin); ::close(_agentStdin);
::close(_agentStdout); ::close(_agentStdout);

View file

@ -28,7 +28,6 @@
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <vector> #include <vector>
#include <thread>
namespace ZeroTier { namespace ZeroTier {
@ -37,7 +36,6 @@ class MacEthernetTap : public EthernetTap
public: public:
MacEthernetTap( MacEthernetTap(
const char *homePath, const char *homePath,
unsigned int concurrency,
const MAC &mac, const MAC &mac,
unsigned int mtu, unsigned int mtu,
unsigned int metric, unsigned int metric,
@ -69,7 +67,6 @@ private:
uint64_t _nwid; uint64_t _nwid;
Thread _thread; Thread _thread;
std::string _homePath; std::string _homePath;
unsigned int _concurrency;
std::string _dev; std::string _dev;
std::vector<MulticastGroup> _multicastGroups; std::vector<MulticastGroup> _multicastGroups;
Mutex _putLock; Mutex _putLock;
@ -82,7 +79,6 @@ private:
volatile bool _enabled; volatile bool _enabled;
mutable std::vector<InetAddress> _ifaddrs; mutable std::vector<InetAddress> _ifaddrs;
mutable uint64_t _lastIfAddrsUpdate; mutable uint64_t _lastIfAddrsUpdate;
std::vector<std::thread> _rxThreads;
}; };

View file

@ -306,7 +306,6 @@ static Mutex globalTapCreateLock;
MacKextEthernetTap::MacKextEthernetTap( MacKextEthernetTap::MacKextEthernetTap(
const char *homePath, const char *homePath,
unsigned int concurrency,
const MAC &mac, const MAC &mac,
unsigned int mtu, unsigned int mtu,
unsigned int metric, unsigned int metric,
@ -318,7 +317,6 @@ MacKextEthernetTap::MacKextEthernetTap(
_arg(arg), _arg(arg),
_nwid(nwid), _nwid(nwid),
_homePath(homePath), _homePath(homePath),
_concurrency(concurrency),
_mtu(mtu), _mtu(mtu),
_metric(metric), _metric(metric),
_fd(0), _fd(0),

View file

@ -37,7 +37,6 @@ class MacKextEthernetTap : public EthernetTap
public: public:
MacKextEthernetTap( MacKextEthernetTap(
const char *homePath, const char *homePath,
unsigned int concurrency,
const MAC &mac, const MAC &mac,
unsigned int mtu, unsigned int mtu,
unsigned int metric, unsigned int metric,
@ -72,7 +71,6 @@ private:
std::string _homePath; std::string _homePath;
std::string _dev; std::string _dev;
std::vector<MulticastGroup> _multicastGroups; std::vector<MulticastGroup> _multicastGroups;
unsigned int _concurrency;
unsigned int _mtu; unsigned int _mtu;
unsigned int _metric; unsigned int _metric;
int _fd; int _fd;

View file

@ -801,6 +801,7 @@ public:
std::vector<PacketRecord *> _rxPacketVector; std::vector<PacketRecord *> _rxPacketVector;
std::vector<std::thread> _rxPacketThreads; std::vector<std::thread> _rxPacketThreads;
Mutex _rxPacketVector_m,_rxPacketThreads_m; Mutex _rxPacketVector_m,_rxPacketThreads_m;
bool _enableMulticore;
bool _allowTcpFallbackRelay; bool _allowTcpFallbackRelay;
bool _forceTcpRelay; bool _forceTcpRelay;
@ -934,74 +935,87 @@ public:
_ports[1] = 0; _ports[1] = 0;
_ports[2] = 0; _ports[2] = 0;
bool _enablePinning = false; _enableMulticore = false;
char* pinningVar = std::getenv("ZT_CPU_PINNING"); char* multicoreVar = std::getenv("ZT_ENABLE_MULTICORE");
if (pinningVar) { if (multicoreVar) {
int tmp = atoi(pinningVar); int tmp = atoi(multicoreVar);
if (tmp > 0) { if (tmp > 0) {
_enablePinning = true; _enableMulticore = true;
} }
} }
char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); if (_enableMulticore) {
if (concurrencyVar) { bool _enablePinning = false;
int tmp = atoi(concurrencyVar); char* pinningVar = std::getenv("ZT_CORE_PINNING");
if (tmp > 0) { if (pinningVar) {
_rxThreadCount = tmp; int tmp = atoi(pinningVar);
if (tmp > 0) {
_enablePinning = true;
}
}
char* concurrencyVar = std::getenv("ZT_CONCURRENCY");
if (concurrencyVar) {
int tmp = atoi(concurrencyVar);
if (tmp > 0) {
_rxThreadCount = tmp;
}
else {
_rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1;
}
} }
else { else {
_rxThreadCount = std::thread::hardware_concurrency(); _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1;
} }
} fprintf(stderr, "using %d rx threads\n", _rxThreadCount);
else { for (unsigned int i = 0; i < _rxThreadCount; ++i) {
_rxThreadCount = std::thread::hardware_concurrency(); _rxPacketThreads.push_back(std::thread([this, i, _enablePinning]() {
}
for (unsigned int i = 0; i < _rxThreadCount; ++i) {
_rxPacketThreads.push_back(std::thread([this, i]() {
if (_enablePinning) {
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
int pinCore = i % _rxThreadCount; int pinCore = i % _rxThreadCount;
fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore);
pthread_t self = pthread_self(); pthread_t self = pthread_self();
cpu_set_t cpuset; cpu_set_t cpuset;
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
CPU_SET(pinCore, &cpuset); CPU_SET(pinCore, &cpuset);
#endif #endif
#ifdef __LINUX__ #ifdef __LINUX__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#elif __FreeBSD__ #elif __FreeBSD__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#endif #endif
#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
if (rc != 0) if (rc != 0)
{
fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
exit(1);
}
#endif
PacketRecord* packet = nullptr;
for (;;) {
if (! _rxPacketQueue.get(packet)) {
break;
}
if (! packet) {
break;
}
const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline);
{ {
Mutex::Lock l(_rxPacketVector_m); fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
_rxPacketVector.push_back(packet); exit(1);
}
if (ZT_ResultCode_isFatal(err)) {
char tmp[256];
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
break;
} }
#endif
} }
})); PacketRecord* packet = nullptr;
for (;;) {
if (! _rxPacketQueue.get(packet)) {
break;
}
if (! packet) {
break;
}
const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline);
{
Mutex::Lock l(_rxPacketVector_m);
_rxPacketVector.push_back(packet);
}
if (ZT_ResultCode_isFatal(err)) {
char tmp[256];
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
break;
}
}
}));
}
} }
prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
@ -2865,25 +2879,39 @@ public:
_lastDirectReceiveFromGlobal = now; _lastDirectReceiveFromGlobal = now;
} }
PacketRecord* packet; if (_enableMulticore) {
_rxPacketVector_m.lock(); PacketRecord* packet;
if (_rxPacketVector.empty()) { _rxPacketVector_m.lock();
packet = new PacketRecord; if (_rxPacketVector.empty()) {
packet = new PacketRecord;
}
else {
packet = _rxPacketVector.back();
_rxPacketVector.pop_back();
}
_rxPacketVector_m.unlock();
packet->sock = reinterpret_cast<int64_t>(sock);
packet->now = now;
memcpy(&(packet->from), from, sizeof(struct sockaddr_storage));
packet->size = (unsigned int)len;
memcpy(packet->data, data, len);
_rxPacketQueue.postLimit(packet, 256 * _rxThreadCount);
} }
else { else {
packet = _rxPacketVector.back(); const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
_rxPacketVector.pop_back(); if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
}
} }
_rxPacketVector_m.unlock();
packet->sock = reinterpret_cast<int64_t>(sock);
packet->now = now;
memcpy(&(packet->from), from, sizeof(struct sockaddr_storage));
packet->size = (unsigned int)len;
memcpy(packet->data, data, len);
_rxPacketQueue.postLimit(packet, 256 * _rxThreadCount);
} }
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
{ {
if (!success) { if (!success) {
@ -3103,7 +3131,7 @@ public:
n.setTap(EthernetTap::newInstance( n.setTap(EthernetTap::newInstance(
nullptr, nullptr,
_homePath.c_str(), _homePath.c_str(),
_rxThreadCount, _enableMulticore ? _rxThreadCount : 1,
MAC(nwc->mac), MAC(nwc->mac),
nwc->mtu, nwc->mtu,
(unsigned int)ZT_IF_METRIC, (unsigned int)ZT_IF_METRIC,