Add recvmmsg support to Phy for Linux.

This commit is contained in:
Adam Ierymenko 2023-07-07 18:16:35 -04:00
parent a872cc3418
commit 0381e0aa92

View file

@ -14,18 +14,17 @@
#ifndef ZT_PHY_HPP
#define ZT_PHY_HPP
#include <list>
#include <stdexcept>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <list>
#include <stdexcept>
#if defined(_WIN32) || defined(_WIN64)
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#define ZT_PHY_SOCKFD_TYPE SOCKET
#define ZT_PHY_SOCKFD_NULL (INVALID_SOCKET)
@ -37,18 +36,18 @@
#else // not Windows
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <signal.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux)
#ifndef IPV6_DONTFRAG
@ -120,14 +119,11 @@ typedef void PhySocket;
* This isn't thread-safe with the exception of whack(), which is safe to
* call from another thread to abort poll().
*/
template <typename HANDLER_PTR_TYPE>
class Phy
{
template <typename HANDLER_PTR_TYPE> class Phy {
private:
HANDLER_PTR_TYPE _handler;
enum PhySocketType
{
enum PhySocketType {
ZT_PHY_SOCKET_CLOSED = 0x00, // socket is closed, will be removed on next poll()
ZT_PHY_SOCKET_TCP_OUT_PENDING = 0x01,
ZT_PHY_SOCKET_TCP_OUT_CONNECTED = 0x02,
@ -140,7 +136,9 @@ private:
};
struct PhySocketImpl {
PhySocketImpl() {}
PhySocketImpl()
{
}
PhySocketType type;
ZT_PHY_SOCKFD_TYPE sock;
void* uptr; // user-settable pointer
@ -167,8 +165,7 @@ public:
* @param noDelay If true, disable TCP NAGLE algorithm on TCP sockets
* @param noCheck If true, attempt to set UDP SO_NO_CHECK option to disable sending checksums
*/
Phy(HANDLER_PTR_TYPE handler,bool noDelay,bool noCheck) :
_handler(handler)
Phy(HANDLER_PTR_TYPE handler, bool noDelay, bool noCheck) : _handler(handler)
{
FD_ZERO(&_readfds);
FD_ZERO(&_writefds);
@ -300,7 +297,8 @@ public:
return (PhySocket*)0;
try {
_socks.push_back(PhySocketImpl());
} catch ( ... ) {
}
catch (...) {
return (PhySocket*)0;
}
PhySocketImpl& sws = _socks.back();
@ -353,38 +351,50 @@ public:
{
BOOL f;
if (localAddress->sa_family == AF_INET6) {
f = TRUE; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f));
f = FALSE; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,(const char *)&f,sizeof(f));
f = TRUE;
setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
f = FALSE;
setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&f, sizeof(f));
}
f = FALSE; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f));
f = TRUE; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(const char *)&f,sizeof(f));
f = FALSE;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
f = TRUE;
setsockopt(s, SOL_SOCKET, SO_BROADCAST, (const char*)&f, sizeof(f));
}
#else // not Windows
{
int f;
if (localAddress->sa_family == AF_INET6) {
f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f));
f = 1;
setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
#ifdef IPV6_MTU_DISCOVER
f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f));
f = 0;
setsockopt(s, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &f, sizeof(f));
#endif
#ifdef IPV6_DONTFRAG
f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f));
f = 0;
setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, &f, sizeof(f));
#endif
}
f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f));
f = 0;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
f = 1;
setsockopt(s, SOL_SOCKET, SO_BROADCAST, (void*)&f, sizeof(f));
#ifdef IP_DONTFRAG
f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f));
f = 0;
setsockopt(s, IPPROTO_IP, IP_DONTFRAG, &f, sizeof(f));
#endif
#ifdef IP_MTU_DISCOVER
f = 0; setsockopt(s,IPPROTO_IP,IP_MTU_DISCOVER,&f,sizeof(f));
f = 0;
setsockopt(s, IPPROTO_IP, IP_MTU_DISCOVER, &f, sizeof(f));
#endif
#ifdef SO_NO_CHECK
// For now at least we only set SO_NO_CHECK on IPv4 sockets since some
// IPv6 stacks incorrectly discard zero checksum packets. May remove
// this restriction later once broken stuff dies more.
if ((localAddress->sa_family == AF_INET) && (_noCheck)) {
f = 1; setsockopt(s,SOL_SOCKET,SO_NO_CHECK,(void *)&f,sizeof(f));
f = 1;
setsockopt(s, SOL_SOCKET, SO_NO_CHECK, (void*)&f, sizeof(f));
}
#endif
}
@ -396,14 +406,18 @@ public:
}
#if defined(_WIN32) || defined(_WIN64)
{ u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); }
{
u_long iMode = 1;
ioctlsocket(s, FIONBIO, &iMode);
}
#else
fcntl(s, F_SETFL, O_NONBLOCK);
#endif
try {
_socks.push_back(PhySocketImpl());
} catch ( ... ) {
}
catch (...) {
ZT_PHY_CLOSE_SOCKET(s);
return (PhySocket*)0;
}
@ -497,7 +511,8 @@ public:
try {
_socks.push_back(PhySocketImpl());
} catch ( ... ) {
}
catch (...) {
ZT_PHY_CLOSE_SOCKET(s);
return (PhySocket*)0;
}
@ -535,18 +550,24 @@ public:
#if defined(_WIN32) || defined(_WIN64)
{
BOOL f;
f = TRUE; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f));
f = TRUE; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f));
f = (_noDelay ? TRUE : FALSE); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
f = TRUE;
::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
f = TRUE;
::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
f = (_noDelay ? TRUE : FALSE);
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
u_long iMode = 1;
ioctlsocket(s, FIONBIO, &iMode);
}
#else
{
int f;
f = 1; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f));
f = 1; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
f = (_noDelay ? 1 : 0); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
f = 1;
::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
f = 1;
::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
f = (_noDelay ? 1 : 0);
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
fcntl(s, F_SETFL, O_NONBLOCK);
}
#endif
@ -563,7 +584,8 @@ public:
try {
_socks.push_back(PhySocketImpl());
} catch ( ... ) {
}
catch (...) {
ZT_PHY_CLOSE_SOCKET(s);
return (PhySocket*)0;
}
@ -617,18 +639,28 @@ public:
#if defined(_WIN32) || defined(_WIN64)
{
BOOL f;
if (remoteAddress->sa_family == AF_INET6) { f = TRUE; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); }
f = TRUE; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f));
f = (_noDelay ? TRUE : FALSE); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
if (remoteAddress->sa_family == AF_INET6) {
f = TRUE;
::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
}
f = TRUE;
::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
f = (_noDelay ? TRUE : FALSE);
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
u_long iMode = 1;
ioctlsocket(s, FIONBIO, &iMode);
}
#else
{
int f;
if (remoteAddress->sa_family == AF_INET6) { f = 1; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); }
f = 1; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
f = (_noDelay ? 1 : 0); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
if (remoteAddress->sa_family == AF_INET6) {
f = 1;
::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
}
f = 1;
::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
f = (_noDelay ? 1 : 0);
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
fcntl(s, F_SETFL, O_NONBLOCK);
}
#endif
@ -648,7 +680,8 @@ public:
try {
_socks.push_back(PhySocketImpl());
} catch ( ... ) {
}
catch (...) {
ZT_PHY_CLOSE_SOCKET(s);
return (PhySocket*)0;
}
@ -659,7 +692,8 @@ public:
if (connected) {
FD_SET(s, &_readfds);
sws.type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED;
} else {
}
else {
FD_SET(s, &_writefds);
#if defined(_WIN32) || defined(_WIN64)
FD_SET(s, &_exceptfds);
@ -674,7 +708,9 @@ public:
if ((callConnectHandler) && (connected)) {
try {
_handler->phyOnTcpConnect((PhySocket*)&sws, &(sws.uptr), true);
} catch ( ... ) {}
}
catch (...) {
}
}
return (PhySocket*)&sws;
@ -820,7 +856,8 @@ public:
PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
if (notifyWritable) {
FD_SET(sws.sock, &_writefds);
} else {
}
else {
FD_CLR(sws.sock, &_writefds);
}
}
@ -840,7 +877,8 @@ public:
PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
if (notifyReadable) {
FD_SET(sws.sock, &_readfds);
} else {
}
else {
FD_CLR(sws.sock, &_readfds);
}
}
@ -885,18 +923,19 @@ public:
for (typename std::list<PhySocketImpl>::iterator s(_socks.begin()); s != _socks.end();) {
switch (s->type) {
case ZT_PHY_SOCKET_TCP_OUT_PENDING:
#if defined(_WIN32) || defined(_WIN64)
if (FD_ISSET(s->sock, &efds)) {
this->close((PhySocket*)&(*s), true);
} else // ... if
}
else // ... if
#endif
if (FD_ISSET(s->sock, &wfds)) {
socklen_t slen = sizeof(ss);
if (::getpeername(s->sock, (struct sockaddr*)&ss, &slen) != 0) {
this->close((PhySocket*)&(*s), true);
} else {
}
else {
s->type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED;
FD_SET(s->sock, &_readfds);
FD_CLR(s->sock, &_writefds);
@ -905,7 +944,9 @@ public:
#endif
try {
_handler->phyOnTcpConnect((PhySocket*)&(*s), &(s->uptr), true);
} catch ( ... ) {}
}
catch (...) {
}
}
}
break;
@ -917,16 +958,21 @@ public:
long n = (long)::recv(sock, buf, sizeof(buf), 0);
if (n <= 0) {
this->close((PhySocket*)&(*s), true);
} else {
}
else {
try {
_handler->phyOnTcpData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n);
} catch ( ... ) {}
}
catch (...) {
}
}
}
if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) {
try {
_handler->phyOnTcpWritable((PhySocket*)&(*s), &(s->uptr));
} catch ( ... ) {}
}
catch (...) {
}
}
} break;
@ -938,12 +984,22 @@ public:
if (ZT_PHY_SOCKFD_VALID(newSock)) {
if (_socks.size() >= ZT_PHY_MAX_SOCKETS) {
ZT_PHY_CLOSE_SOCKET(newSock);
} else {
}
else {
#if defined(_WIN32) || defined(_WIN64)
{ BOOL f = (_noDelay ? TRUE : FALSE); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
{ u_long iMode=1; ioctlsocket(newSock,FIONBIO,&iMode); }
{
BOOL f = (_noDelay ? TRUE : FALSE);
setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
}
{
u_long iMode = 1;
ioctlsocket(newSock, FIONBIO, &iMode);
}
#else
{ int f = (_noDelay ? 1 : 0); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
{
int f = (_noDelay ? 1 : 0);
setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
}
fcntl(newSock, F_SETFL, O_NONBLOCK);
#endif
_socks.push_back(PhySocketImpl());
@ -957,7 +1013,9 @@ public:
memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage));
try {
_handler->phyOnTcpAccept((PhySocket*)&(*s), (PhySocket*)&(_socks.back()), &(s->uptr), &(sws.uptr), (const struct sockaddr*)&(sws.saddr));
} catch ( ... ) {}
}
catch (...) {
}
}
}
}
@ -965,6 +1023,42 @@ public:
case ZT_PHY_SOCKET_UDP:
if (FD_ISSET(s->sock, &rfds)) {
#if (defined(__linux__) || defined(linux) || defined(__linux)) && defined(MSG_WAITFORONE)
#define RECVMMSG_WINDOW_SIZE 128
#define RECVMMSG_BUF_SIZE 1500
iovec iovs[RECVMMSG_WINDOW_SIZE];
uint8_t bufs[RECVMMSG_WINDOW_SIZE][RECVMMSG_BUF_SIZE];
sockaddr_storage addrs[RECVMMSG_WINDOW_SIZE];
memset(addrs, 0, sizeof(addrs));
mmsghdr mm[RECVMMSG_WINDOW_SIZE];
memset(mm, 0, sizeof(mm));
for (int k = 0; k < 1024; ++k) {
for (int i = 0; i < RECVMMSG_WINDOW_SIZE; ++i) {
iovs[i].iov_base = (void*)bufs[i];
iovs[i].iov_len = RECVMMSG_BUF_SIZE;
mm[i].msg_hdr.msg_name = (void*)&(addrs[i]);
mm[i].msg_hdr.msg_namelen = sizeof(sockaddr_storage);
mm[i].msg_hdr.msg_iov = &(iovs[i]);
mm[i].msg_hdr.msg_iovlen = 1;
}
int received_count = recvmmsg(s->sock, mm, RECVMMSG_WINDOW_SIZE, MSG_WAITFORONE, nullptr);
if (received_count > 0) {
for (int i = 0; i < received_count; ++i) {
long n = (long)mm[i].msg_hdr.msg_iov->iov_len;
if (n > 0) {
try {
_handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&(addrs[i]), bufs[i], (unsigned long)n);
}
catch (...) {
}
}
}
}
else {
break;
}
}
#else
for (int k = 0; k < 1024; ++k) {
memset(&ss, 0, sizeof(ss));
socklen_t slen = sizeof(ss);
@ -972,10 +1066,14 @@ public:
if (n > 0) {
try {
_handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&ss, (void*)buf, (unsigned long)n);
} catch ( ... ) {}
} else if (n < 0)
}
catch (...) {
}
}
else if (n < 0)
break;
}
#endif
}
break;
@ -985,16 +1083,21 @@ public:
if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) {
try {
_handler->phyOnUnixWritable((PhySocket*)&(*s), &(s->uptr));
} catch ( ... ) {}
}
catch (...) {
}
}
if (FD_ISSET(sock, &rfds)) {
long n = (long)::read(sock, buf, sizeof(buf));
if (n <= 0) {
this->close((PhySocket*)&(*s), true);
} else {
}
else {
try {
_handler->phyOnUnixData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n);
} catch ( ... ) {}
}
catch (...) {
}
}
}
#endif // __UNIX_LIKE__
@ -1009,7 +1112,8 @@ public:
if (ZT_PHY_SOCKFD_VALID(newSock)) {
if (_socks.size() >= ZT_PHY_MAX_SOCKETS) {
ZT_PHY_CLOSE_SOCKET(newSock);
} else {
}
else {
fcntl(newSock, F_SETFL, O_NONBLOCK);
_socks.push_back(PhySocketImpl());
PhySocketImpl& sws = _socks.back();
@ -1022,7 +1126,9 @@ public:
memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage));
try {
//_handler->phyOnUnixAccept((PhySocket *)&(*s),(PhySocket *)&(_socks.back()),&(s->uptr),&(sws.uptr));
} catch ( ... ) {}
}
catch (...) {
}
}
}
}
@ -1036,18 +1142,20 @@ public:
if ((readable) || (writable)) {
try {
//_handler->phyOnFileDescriptorActivity((PhySocket *)&(*s),&(s->uptr),readable,writable);
} catch ( ... ) {}
}
catch (...) {
}
}
} break;
default:
break;
}
if (s->type == ZT_PHY_SOCKET_CLOSED)
_socks.erase(s++);
else ++s;
else
++s;
}
}
@ -1082,19 +1190,25 @@ public:
case ZT_PHY_SOCKET_TCP_OUT_PENDING:
try {
_handler->phyOnTcpConnect(sock, &(sws.uptr), false);
} catch ( ... ) {}
}
catch (...) {
}
break;
case ZT_PHY_SOCKET_TCP_OUT_CONNECTED:
case ZT_PHY_SOCKET_TCP_IN:
try {
_handler->phyOnTcpClose(sock, &(sws.uptr));
} catch ( ... ) {}
}
catch (...) {
}
break;
case ZT_PHY_SOCKET_UNIX_IN:
#ifdef __UNIX_LIKE__
try {
_handler->phyOnUnixClose(sock, &(sws.uptr));
} catch ( ... ) {}
}
catch (...) {
}
#endif // __UNIX_LIKE__
break;
default: