From 0381e0aa922048e554b78ddc6cdc1d3762c0698d Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 7 Jul 2023 18:16:35 -0400 Subject: [PATCH] Add recvmmsg support to Phy for Linux. --- osdep/Phy.hpp | 772 +++++++++++++++++++++++++++++--------------------- 1 file changed, 443 insertions(+), 329 deletions(-) diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index 7be6546fe..edad959f1 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -14,41 +14,40 @@ #ifndef ZT_PHY_HPP #define ZT_PHY_HPP +#include +#include #include #include #include -#include -#include - #if defined(_WIN32) || defined(_WIN64) +#include #include #include -#include -#define ZT_PHY_SOCKFD_TYPE SOCKET -#define ZT_PHY_SOCKFD_NULL (INVALID_SOCKET) -#define ZT_PHY_SOCKFD_VALID(s) ((s) != INVALID_SOCKET) -#define ZT_PHY_CLOSE_SOCKET(s) ::closesocket(s) -#define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) -#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS +#define ZT_PHY_SOCKFD_TYPE SOCKET +#define ZT_PHY_SOCKFD_NULL (INVALID_SOCKET) +#define ZT_PHY_SOCKFD_VALID(s) ((s) != INVALID_SOCKET) +#define ZT_PHY_CLOSE_SOCKET(s) ::closesocket(s) +#define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) +#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage -#else // not Windows +#else // not Windows -#include -#include -#include -#include -#include -#include -#include -#include -#include #include +#include +#include #include #include +#include +#include +#include +#include +#include +#include +#include #if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux) #ifndef IPV6_DONTFRAG @@ -56,15 +55,15 @@ #endif #endif -#define ZT_PHY_SOCKFD_TYPE int -#define ZT_PHY_SOCKFD_NULL (-1) -#define ZT_PHY_SOCKFD_VALID(s) ((s) > -1) -#define ZT_PHY_CLOSE_SOCKET(s) ::close(s) -#define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) -#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS +#define ZT_PHY_SOCKFD_TYPE int +#define ZT_PHY_SOCKFD_NULL (-1) +#define ZT_PHY_SOCKFD_VALID(s) ((s) > -1) +#define ZT_PHY_CLOSE_SOCKET(s) ::close(s) +#define ZT_PHY_MAX_SOCKETS (FD_SETSIZE) +#define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage -#endif // Windows or not +#endif // Windows or not namespace ZeroTier { @@ -120,15 +119,12 @@ typedef void PhySocket; * This isn't thread-safe with the exception of whack(), which is safe to * call from another thread to abort poll(). */ -template -class Phy -{ -private: +template class Phy { + private: HANDLER_PTR_TYPE _handler; - enum PhySocketType - { - ZT_PHY_SOCKET_CLOSED = 0x00, // socket is closed, will be removed on next poll() + enum PhySocketType { + ZT_PHY_SOCKET_CLOSED = 0x00, // socket is closed, will be removed on next poll() ZT_PHY_SOCKET_TCP_OUT_PENDING = 0x01, ZT_PHY_SOCKET_TCP_OUT_CONNECTED = 0x02, ZT_PHY_SOCKET_TCP_IN = 0x03, @@ -140,11 +136,13 @@ private: }; struct PhySocketImpl { - PhySocketImpl() {} + PhySocketImpl() + { + } PhySocketType type; ZT_PHY_SOCKFD_TYPE sock; - void *uptr; // user-settable pointer - ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP + void* uptr; // user-settable pointer + ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP }; std::list _socks; @@ -161,14 +159,13 @@ private: bool _noDelay; bool _noCheck; -public: + public: /** * @param handler Pointer of type HANDLER_PTR_TYPE to handler * @param noDelay If true, disable TCP NAGLE algorithm on TCP sockets * @param noCheck If true, attempt to set UDP SO_NO_CHECK option to disable sending checksums */ - Phy(HANDLER_PTR_TYPE handler,bool noDelay,bool noCheck) : - _handler(handler) + Phy(HANDLER_PTR_TYPE handler, bool noDelay, bool noCheck) : _handler(handler) { FD_ZERO(&_readfds); FD_ZERO(&_writefds); @@ -180,7 +177,7 @@ public: { // hack copied from StackOverflow, behaves a bit like pipe() on *nix systems struct sockaddr_in inaddr; struct sockaddr addr; - SOCKET lst=::socket(AF_INET, SOCK_STREAM,IPPROTO_TCP); + SOCKET lst = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (lst == INVALID_SOCKET) throw std::runtime_error("unable to create pipes for select() abort"); memset(&inaddr, 0, sizeof(inaddr)); @@ -188,24 +185,24 @@ public: inaddr.sin_family = AF_INET; inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); inaddr.sin_port = 0; - int yes=1; - setsockopt(lst,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(yes)); - bind(lst,(struct sockaddr *)&inaddr,sizeof(inaddr)); - listen(lst,1); - int len=sizeof(inaddr); - getsockname(lst, &addr,&len); - pipes[0]=::socket(AF_INET, SOCK_STREAM,0); + int yes = 1; + setsockopt(lst, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof(yes)); + bind(lst, (struct sockaddr*)&inaddr, sizeof(inaddr)); + listen(lst, 1); + int len = sizeof(inaddr); + getsockname(lst, &addr, &len); + pipes[0] = ::socket(AF_INET, SOCK_STREAM, 0); if (pipes[0] == INVALID_SOCKET) throw std::runtime_error("unable to create pipes for select() abort"); - connect(pipes[0],&addr,len); - pipes[1]=accept(lst,0,0); + connect(pipes[0], &addr, len); + pipes[1] = accept(lst, 0, 0); closesocket(lst); } -#else // not Windows +#else // not Windows int pipes[2]; if (::pipe(pipes)) throw std::runtime_error("unable to create pipes for select() abort"); -#endif // Windows or not +#endif // Windows or not _nfds = (pipes[0] > pipes[1]) ? (long)pipes[0] : (long)pipes[1]; _whackReceiveSocket = pipes[0]; @@ -216,9 +213,9 @@ public: ~Phy() { - for(typename std::list::const_iterator s(_socks.begin());s!=_socks.end();++s) { + for (typename std::list::const_iterator s(_socks.begin()); s != _socks.end(); ++s) { if (s->type != ZT_PHY_SOCKET_CLOSED) - this->close((PhySocket *)&(*s),true); + this->close((PhySocket*)&(*s), true); } ZT_PHY_CLOSE_SOCKET(_whackReceiveSocket); ZT_PHY_CLOSE_SOCKET(_whackSendSocket); @@ -294,25 +291,26 @@ public: * @param uptr User pointer to supply to callbacks * @return PhySocket wrapping fd or NULL on failure (out of memory or too many sockets) */ - inline PhySocket *wrapSocket(ZT_PHY_SOCKFD_TYPE fd,void *uptr = (void *)0) + inline PhySocket* wrapSocket(ZT_PHY_SOCKFD_TYPE fd, void* uptr = (void*)0) { if (_socks.size() >= ZT_PHY_MAX_SOCKETS) - return (PhySocket *)0; + return (PhySocket*)0; try { _socks.push_back(PhySocketImpl()); - } catch ( ... ) { - return (PhySocket *)0; } - PhySocketImpl &sws = _socks.back(); + catch (...) { + return (PhySocket*)0; + } + PhySocketImpl& sws = _socks.back(); if ((long)fd > _nfds) _nfds = (long)fd; - FD_SET(fd,&_readfds); + FD_SET(fd, &_readfds); sws.type = ZT_PHY_SOCKET_UNIX_IN; /* TODO: Type was changed to allow for CBs with new RPC model */ sws.sock = fd; sws.uptr = uptr; - memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); + memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage)); // no sockaddr for this socket type, leave saddr null - return (PhySocket *)&sws; + return (PhySocket*)&sws; } /** @@ -323,27 +321,27 @@ public: * @param bufferSize Desired socket receive/send buffer size -- will set as close to this as possible (default: 0, leave alone) * @return Socket or NULL on failure to bind */ - inline PhySocket *udpBind(const struct sockaddr *localAddress,void *uptr = (void *)0,int bufferSize = 0) + inline PhySocket* udpBind(const struct sockaddr* localAddress, void* uptr = (void*)0, int bufferSize = 0) { if (_socks.size() >= ZT_PHY_MAX_SOCKETS) - return (PhySocket *)0; + return (PhySocket*)0; - ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family,SOCK_DGRAM,0); - if (!ZT_PHY_SOCKFD_VALID(s)) - return (PhySocket *)0; + ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family, SOCK_DGRAM, 0); + if (! ZT_PHY_SOCKFD_VALID(s)) + return (PhySocket*)0; if (bufferSize > 0) { int bs = bufferSize; while (bs >= 65536) { int tmpbs = bs; - if (setsockopt(s,SOL_SOCKET,SO_RCVBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0) break; bs -= 4096; } bs = bufferSize; while (bs >= 65536) { int tmpbs = bs; - if (setsockopt(s,SOL_SOCKET,SO_SNDBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0) break; bs -= 4096; } @@ -353,72 +351,88 @@ public: { BOOL f; if (localAddress->sa_family == AF_INET6) { - f = TRUE; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); - f = FALSE; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,(const char *)&f,sizeof(f)); + f = TRUE; + setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f)); + f = FALSE; + setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&f, sizeof(f)); } - f = FALSE; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f)); - f = TRUE; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(const char *)&f,sizeof(f)); + f = FALSE; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f)); + f = TRUE; + setsockopt(s, SOL_SOCKET, SO_BROADCAST, (const char*)&f, sizeof(f)); } -#else // not Windows +#else // not Windows { int f; if (localAddress->sa_family == AF_INET6) { - f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); + f = 1; + setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f)); #ifdef IPV6_MTU_DISCOVER - f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f)); + f = 0; + setsockopt(s, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &f, sizeof(f)); #endif #ifdef IPV6_DONTFRAG - f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f)); + f = 0; + setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, &f, sizeof(f)); #endif } - f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); - f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f)); + f = 0; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f)); + f = 1; + setsockopt(s, SOL_SOCKET, SO_BROADCAST, (void*)&f, sizeof(f)); #ifdef IP_DONTFRAG - f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f)); + f = 0; + setsockopt(s, IPPROTO_IP, IP_DONTFRAG, &f, sizeof(f)); #endif #ifdef IP_MTU_DISCOVER - f = 0; setsockopt(s,IPPROTO_IP,IP_MTU_DISCOVER,&f,sizeof(f)); + f = 0; + setsockopt(s, IPPROTO_IP, IP_MTU_DISCOVER, &f, sizeof(f)); #endif #ifdef SO_NO_CHECK // For now at least we only set SO_NO_CHECK on IPv4 sockets since some // IPv6 stacks incorrectly discard zero checksum packets. May remove // this restriction later once broken stuff dies more. - if ((localAddress->sa_family == AF_INET)&&(_noCheck)) { - f = 1; setsockopt(s,SOL_SOCKET,SO_NO_CHECK,(void *)&f,sizeof(f)); + if ((localAddress->sa_family == AF_INET) && (_noCheck)) { + f = 1; + setsockopt(s, SOL_SOCKET, SO_NO_CHECK, (void*)&f, sizeof(f)); } #endif } -#endif // Windows or not +#endif // Windows or not - if (::bind(s,localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { + if (::bind(s, localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; + return (PhySocket*)0; } #if defined(_WIN32) || defined(_WIN64) - { u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); } + { + u_long iMode = 1; + ioctlsocket(s, FIONBIO, &iMode); + } #else - fcntl(s,F_SETFL,O_NONBLOCK); + fcntl(s, F_SETFL, O_NONBLOCK); #endif try { _socks.push_back(PhySocketImpl()); - } catch ( ... ) { - ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; } - PhySocketImpl &sws = _socks.back(); + catch (...) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket*)0; + } + PhySocketImpl& sws = _socks.back(); if ((long)s > _nfds) _nfds = (long)s; - FD_SET(s,&_readfds); + FD_SET(s, &_readfds); sws.type = ZT_PHY_SOCKET_UDP; sws.sock = s; sws.uptr = uptr; - memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); - memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage)); + memcpy(&(sws.saddr), localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); - return (PhySocket *)&sws; + return (PhySocket*)&sws; } /** @@ -427,15 +441,15 @@ public: * @param ttl New TTL (0 or >255 will set it to 255) * @return True on success */ - inline bool setIp4UdpTtl(PhySocket *sock,unsigned int ttl) + inline bool setIp4UdpTtl(PhySocket* sock, unsigned int ttl) { - PhySocketImpl &sws = *(reinterpret_cast(sock)); + PhySocketImpl& sws = *(reinterpret_cast(sock)); #if defined(_WIN32) || defined(_WIN64) - DWORD tmp = ((ttl == 0)||(ttl > 255)) ? 255 : (DWORD)ttl; - return (::setsockopt(sws.sock,IPPROTO_IP,IP_TTL,(const char *)&tmp,sizeof(tmp)) == 0); + DWORD tmp = ((ttl == 0) || (ttl > 255)) ? 255 : (DWORD)ttl; + return (::setsockopt(sws.sock, IPPROTO_IP, IP_TTL, (const char*)&tmp, sizeof(tmp)) == 0); #else - int tmp = ((ttl == 0)||(ttl > 255)) ? 255 : (int)ttl; - return (::setsockopt(sws.sock,IPPROTO_IP,IP_TTL,(void *)&tmp,sizeof(tmp)) == 0); + int tmp = ((ttl == 0) || (ttl > 255)) ? 255 : (int)ttl; + return (::setsockopt(sws.sock, IPPROTO_IP, IP_TTL, (void*)&tmp, sizeof(tmp)) == 0); #endif } @@ -448,13 +462,13 @@ public: * @param len Length of packet * @return True if packet appears to have been sent successfully */ - inline bool udpSend(PhySocket *sock,const struct sockaddr *remoteAddress,const void *data,unsigned long len) + inline bool udpSend(PhySocket* sock, const struct sockaddr* remoteAddress, const void* data, unsigned long len) { - PhySocketImpl &sws = *(reinterpret_cast(sock)); + PhySocketImpl& sws = *(reinterpret_cast(sock)); #if defined(_WIN32) || defined(_WIN64) - return ((long)::sendto(sws.sock,reinterpret_cast(data),len,0,remoteAddress,(remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len); + return ((long)::sendto(sws.sock, reinterpret_cast(data), len, 0, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len); #else - return ((long)::sendto(sws.sock,data,len,0,remoteAddress,(remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len); + return ((long)::sendto(sws.sock, data, len, 0, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len); #endif } @@ -466,55 +480,56 @@ public: * @param uptr Arbitrary pointer to associate * @return PhySocket or NULL if cannot bind */ - inline PhySocket *unixListen(const char *path,void *uptr = (void *)0) + inline PhySocket* unixListen(const char* path, void* uptr = (void*)0) { struct sockaddr_un sun; if (_socks.size() >= ZT_PHY_MAX_SOCKETS) - return (PhySocket *)0; + return (PhySocket*)0; - memset(&sun,0,sizeof(sun)); + memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; if (strlen(path) >= sizeof(sun.sun_path)) - return (PhySocket *)0; - strcpy(sun.sun_path,path); + return (PhySocket*)0; + strcpy(sun.sun_path, path); - ZT_PHY_SOCKFD_TYPE s = ::socket(PF_UNIX,SOCK_STREAM,0); - if (!ZT_PHY_SOCKFD_VALID(s)) - return (PhySocket *)0; + ZT_PHY_SOCKFD_TYPE s = ::socket(PF_UNIX, SOCK_STREAM, 0); + if (! ZT_PHY_SOCKFD_VALID(s)) + return (PhySocket*)0; - ::fcntl(s,F_SETFL,O_NONBLOCK); + ::fcntl(s, F_SETFL, O_NONBLOCK); ::unlink(path); - if (::bind(s,(struct sockaddr *)&sun,sizeof(struct sockaddr_un)) != 0) { + if (::bind(s, (struct sockaddr*)&sun, sizeof(struct sockaddr_un)) != 0) { ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; + return (PhySocket*)0; } - if (::listen(s,128) != 0) { + if (::listen(s, 128) != 0) { ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; + return (PhySocket*)0; } try { _socks.push_back(PhySocketImpl()); - } catch ( ... ) { - ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; } - PhySocketImpl &sws = _socks.back(); + catch (...) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket*)0; + } + PhySocketImpl& sws = _socks.back(); if ((long)s > _nfds) _nfds = (long)s; - FD_SET(s,&_readfds); + FD_SET(s, &_readfds); sws.type = ZT_PHY_SOCKET_UNIX_LISTEN; sws.sock = s; sws.uptr = uptr; - memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); - memcpy(&(sws.saddr),&sun,sizeof(struct sockaddr_un)); + memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage)); + memcpy(&(sws.saddr), &sun, sizeof(struct sockaddr_un)); - return (PhySocket *)&sws; + return (PhySocket*)&sws; } -#endif // __UNIX_LIKE__ +#endif // __UNIX_LIKE__ /** * Bind a local listen socket to listen for new TCP connections @@ -523,62 +538,69 @@ public: * @param uptr Initial value of uptr for new socket (default: NULL) * @return Socket or NULL on failure to bind */ - inline PhySocket *tcpListen(const struct sockaddr *localAddress,void *uptr = (void *)0) + inline PhySocket* tcpListen(const struct sockaddr* localAddress, void* uptr = (void*)0) { if (_socks.size() >= ZT_PHY_MAX_SOCKETS) - return (PhySocket *)0; + return (PhySocket*)0; - ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family,SOCK_STREAM,0); - if (!ZT_PHY_SOCKFD_VALID(s)) - return (PhySocket *)0; + ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family, SOCK_STREAM, 0); + if (! ZT_PHY_SOCKFD_VALID(s)) + return (PhySocket*)0; #if defined(_WIN32) || defined(_WIN64) { BOOL f; - f = TRUE; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); - f = TRUE; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f)); - f = (_noDelay ? TRUE : FALSE); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); - u_long iMode=1; - ioctlsocket(s,FIONBIO,&iMode); + f = TRUE; + ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f)); + f = TRUE; + ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f)); + f = (_noDelay ? TRUE : FALSE); + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f)); + u_long iMode = 1; + ioctlsocket(s, FIONBIO, &iMode); } #else { int f; - f = 1; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); - f = 1; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); - f = (_noDelay ? 1 : 0); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); - fcntl(s,F_SETFL,O_NONBLOCK); + f = 1; + ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f)); + f = 1; + ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f)); + f = (_noDelay ? 1 : 0); + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f)); + fcntl(s, F_SETFL, O_NONBLOCK); } #endif - if (::bind(s,localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { + if (::bind(s, localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; + return (PhySocket*)0; } - if (::listen(s,1024)) { + if (::listen(s, 1024)) { ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; + return (PhySocket*)0; } try { _socks.push_back(PhySocketImpl()); - } catch ( ... ) { - ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; } - PhySocketImpl &sws = _socks.back(); + catch (...) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket*)0; + } + PhySocketImpl& sws = _socks.back(); if ((long)s > _nfds) _nfds = (long)s; - FD_SET(s,&_readfds); + FD_SET(s, &_readfds); sws.type = ZT_PHY_SOCKET_TCP_LISTEN; sws.sock = s; sws.uptr = uptr; - memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); - memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage)); + memcpy(&(sws.saddr), localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); - return (PhySocket *)&sws; + return (PhySocket*)&sws; } /** @@ -603,38 +625,48 @@ public: * @param callConnectHandler If true, call TCP connect handler even if result is known before function exit (default: true) * @return New socket or NULL on failure */ - inline PhySocket *tcpConnect(const struct sockaddr *remoteAddress,bool &connected,void *uptr = (void *)0,bool callConnectHandler = true) + inline PhySocket* tcpConnect(const struct sockaddr* remoteAddress, bool& connected, void* uptr = (void*)0, bool callConnectHandler = true) { if (_socks.size() >= ZT_PHY_MAX_SOCKETS) - return (PhySocket *)0; + return (PhySocket*)0; - ZT_PHY_SOCKFD_TYPE s = ::socket(remoteAddress->sa_family,SOCK_STREAM,0); - if (!ZT_PHY_SOCKFD_VALID(s)) { + ZT_PHY_SOCKFD_TYPE s = ::socket(remoteAddress->sa_family, SOCK_STREAM, 0); + if (! ZT_PHY_SOCKFD_VALID(s)) { connected = false; - return (PhySocket *)0; + return (PhySocket*)0; } #if defined(_WIN32) || defined(_WIN64) { BOOL f; - if (remoteAddress->sa_family == AF_INET6) { f = TRUE; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); } - f = TRUE; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f)); - f = (_noDelay ? TRUE : FALSE); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); - u_long iMode=1; - ioctlsocket(s,FIONBIO,&iMode); + if (remoteAddress->sa_family == AF_INET6) { + f = TRUE; + ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f)); + } + f = TRUE; + ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f)); + f = (_noDelay ? TRUE : FALSE); + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f)); + u_long iMode = 1; + ioctlsocket(s, FIONBIO, &iMode); } #else { int f; - if (remoteAddress->sa_family == AF_INET6) { f = 1; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); } - f = 1; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); - f = (_noDelay ? 1 : 0); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); - fcntl(s,F_SETFL,O_NONBLOCK); + if (remoteAddress->sa_family == AF_INET6) { + f = 1; + ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f)); + } + f = 1; + ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f)); + f = (_noDelay ? 1 : 0); + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f)); + fcntl(s, F_SETFL, O_NONBLOCK); } #endif connected = true; - if (::connect(s,remoteAddress,(remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { + if (::connect(s, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { connected = false; #if defined(_WIN32) || defined(_WIN64) if (WSAGetLastError() != WSAEWOULDBLOCK) { @@ -642,42 +674,46 @@ public: if (errno != EINPROGRESS) { #endif ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; - } // else connection is proceeding asynchronously... + return (PhySocket*)0; + } // else connection is proceeding asynchronously... } try { _socks.push_back(PhySocketImpl()); - } catch ( ... ) { - ZT_PHY_CLOSE_SOCKET(s); - return (PhySocket *)0; } - PhySocketImpl &sws = _socks.back(); + catch (...) { + ZT_PHY_CLOSE_SOCKET(s); + return (PhySocket*)0; + } + PhySocketImpl& sws = _socks.back(); if ((long)s > _nfds) _nfds = (long)s; if (connected) { - FD_SET(s,&_readfds); + FD_SET(s, &_readfds); sws.type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED; - } else { - FD_SET(s,&_writefds); + } + else { + FD_SET(s, &_writefds); #if defined(_WIN32) || defined(_WIN64) - FD_SET(s,&_exceptfds); + FD_SET(s, &_exceptfds); #endif sws.type = ZT_PHY_SOCKET_TCP_OUT_PENDING; } sws.sock = s; sws.uptr = uptr; - memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); - memcpy(&(sws.saddr),remoteAddress,(remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage)); + memcpy(&(sws.saddr), remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); - if ((callConnectHandler)&&(connected)) { + if ((callConnectHandler) && (connected)) { try { - _handler->phyOnTcpConnect((PhySocket *)&sws,&(sws.uptr),true); - } catch ( ... ) {} + _handler->phyOnTcpConnect((PhySocket*)&sws, &(sws.uptr), true); + } + catch (...) { + } } - return (PhySocket *)&sws; + return (PhySocket*)&sws; } /** @@ -690,13 +726,13 @@ public: * @param receiveBufferSize Desired size of receive buffer * @param sendBufferSize Desired size of send buffer */ - inline void setBufferSizes(const PhySocket *sock,int receiveBufferSize,int sendBufferSize) + inline void setBufferSizes(const PhySocket* sock, int receiveBufferSize, int sendBufferSize) { - PhySocketImpl &sws = *(reinterpret_cast(sock)); + PhySocketImpl& sws = *(reinterpret_cast(sock)); if (receiveBufferSize > 0) { while (receiveBufferSize > 0) { int tmpbs = receiveBufferSize; - if (::setsockopt(sws.sock,SOL_SOCKET,SO_RCVBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + if (::setsockopt(sws.sock, SOL_SOCKET, SO_RCVBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0) break; receiveBufferSize -= 16384; } @@ -704,7 +740,7 @@ public: if (sendBufferSize > 0) { while (sendBufferSize > 0) { int tmpbs = sendBufferSize; - if (::setsockopt(sws.sock,SOL_SOCKET,SO_SNDBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) + if (::setsockopt(sws.sock, SOL_SOCKET, SO_SNDBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0) break; sendBufferSize -= 16384; } @@ -726,29 +762,29 @@ public: * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true) * @return Number of bytes actually sent or -1 on fatal error (socket closure) */ - inline long streamSend(PhySocket *sock,const void *data,unsigned long len,bool callCloseHandler = true) + inline long streamSend(PhySocket* sock, const void* data, unsigned long len, bool callCloseHandler = true) { - PhySocketImpl &sws = *(reinterpret_cast(sock)); + PhySocketImpl& sws = *(reinterpret_cast(sock)); #if defined(_WIN32) || defined(_WIN64) - long n = (long)::send(sws.sock,reinterpret_cast(data),len,0); + long n = (long)::send(sws.sock, reinterpret_cast(data), len, 0); if (n == SOCKET_ERROR) { - switch(WSAGetLastError()) { - case WSAEINTR: - case WSAEWOULDBLOCK: - return 0; - default: - this->close(sock,callCloseHandler); - return -1; - } + switch (WSAGetLastError()) { + case WSAEINTR: + case WSAEWOULDBLOCK: + return 0; + default: + this->close(sock, callCloseHandler); + return -1; + } } -#else // not Windows - long n = (long)::send(sws.sock,data,len,0); +#else // not Windows + long n = (long)::send(sws.sock, data, len, 0); if (n < 0) { - switch(errno) { + switch (errno) { #ifdef EAGAIN case EAGAIN: #endif -#if defined(EWOULDBLOCK) && ( !defined(EAGAIN) || (EWOULDBLOCK != EAGAIN) ) +#if defined(EWOULDBLOCK) && (! defined(EAGAIN) || (EWOULDBLOCK != EAGAIN)) case EWOULDBLOCK: #endif #ifdef EINTR @@ -756,11 +792,11 @@ public: #endif return 0; default: - this->close(sock,callCloseHandler); + this->close(sock, callCloseHandler); return -1; } } -#endif // Windows or not +#endif // Windows or not return n; } @@ -778,16 +814,16 @@ public: * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true) * @return Number of bytes actually sent or -1 on fatal error (socket closure) */ - inline long unixSend(PhySocket *sock,const void *data,unsigned long len,bool callCloseHandler = true) + inline long unixSend(PhySocket* sock, const void* data, unsigned long len, bool callCloseHandler = true) { - PhySocketImpl &sws = *(reinterpret_cast(sock)); - long n = (long)::write(sws.sock,data,len); + PhySocketImpl& sws = *(reinterpret_cast(sock)); + long n = (long)::write(sws.sock, data, len); if (n < 0) { - switch(errno) { + switch (errno) { #ifdef EAGAIN case EAGAIN: #endif -#if defined(EWOULDBLOCK) && ( !defined(EAGAIN) || (EWOULDBLOCK != EAGAIN) ) +#if defined(EWOULDBLOCK) && (! defined(EAGAIN) || (EWOULDBLOCK != EAGAIN)) case EWOULDBLOCK: #endif #ifdef EINTR @@ -795,13 +831,13 @@ public: #endif return 0; default: - this->close(sock,callCloseHandler); + this->close(sock, callCloseHandler); return -1; } } return n; } -#endif // __UNIX_LIKE__ +#endif // __UNIX_LIKE__ /** * For streams, sets whether we want to be notified that the socket is writable @@ -815,13 +851,14 @@ public: * @param sock Stream connection socket * @param notifyWritable Want writable notifications? */ - inline void setNotifyWritable(PhySocket *sock,bool notifyWritable) + inline void setNotifyWritable(PhySocket* sock, bool notifyWritable) { - PhySocketImpl &sws = *(reinterpret_cast(sock)); + PhySocketImpl& sws = *(reinterpret_cast(sock)); if (notifyWritable) { - FD_SET(sws.sock,&_writefds); - } else { - FD_CLR(sws.sock,&_writefds); + FD_SET(sws.sock, &_writefds); + } + else { + FD_CLR(sws.sock, &_writefds); } } @@ -835,13 +872,14 @@ public: * @param sock Socket to modify * @param notifyReadable True if socket should be monitored for readability */ - inline void setNotifyReadable(PhySocket *sock,bool notifyReadable) + inline void setNotifyReadable(PhySocket* sock, bool notifyReadable) { - PhySocketImpl &sws = *(reinterpret_cast(sock)); + PhySocketImpl& sws = *(reinterpret_cast(sock)); if (notifyReadable) { - FD_SET(sws.sock,&_readfds); - } else { - FD_CLR(sws.sock,&_readfds); + FD_SET(sws.sock, &_readfds); + } + else { + FD_CLR(sws.sock, &_readfds); } } @@ -859,195 +897,265 @@ public: char buf[131072]; struct sockaddr_storage ss; struct timeval tv; - fd_set rfds,wfds,efds; + fd_set rfds, wfds, efds; - memcpy(&rfds,&_readfds,sizeof(rfds)); - memcpy(&wfds,&_writefds,sizeof(wfds)); + memcpy(&rfds, &_readfds, sizeof(rfds)); + memcpy(&wfds, &_writefds, sizeof(wfds)); #if defined(_WIN32) || defined(_WIN64) - memcpy(&efds,&_exceptfds,sizeof(efds)); + memcpy(&efds, &_exceptfds, sizeof(efds)); #else FD_ZERO(&efds); #endif tv.tv_sec = (long)(timeout / 1000); tv.tv_usec = (long)((timeout % 1000) * 1000); - if (::select((int)_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0) <= 0) + if (::select((int)_nfds + 1, &rfds, &wfds, &efds, (timeout > 0) ? &tv : (struct timeval*)0) <= 0) return; - if (FD_ISSET(_whackReceiveSocket,&rfds)) { + if (FD_ISSET(_whackReceiveSocket, &rfds)) { char tmp[16]; #if defined(_WIN32) || defined(_WIN64) - ::recv(_whackReceiveSocket,tmp,16,0); + ::recv(_whackReceiveSocket, tmp, 16, 0); #else - ::read(_whackReceiveSocket,tmp,16); + ::read(_whackReceiveSocket, tmp, 16); #endif } - for(typename std::list::iterator s(_socks.begin());s!=_socks.end();) { + for (typename std::list::iterator s(_socks.begin()); s != _socks.end();) { switch (s->type) { - case ZT_PHY_SOCKET_TCP_OUT_PENDING: #if defined(_WIN32) || defined(_WIN64) - if (FD_ISSET(s->sock,&efds)) { - this->close((PhySocket *)&(*s),true); - } else // ... if + if (FD_ISSET(s->sock, &efds)) { + this->close((PhySocket*)&(*s), true); + } + else // ... if #endif - if (FD_ISSET(s->sock,&wfds)) { + if (FD_ISSET(s->sock, &wfds)) { socklen_t slen = sizeof(ss); - if (::getpeername(s->sock,(struct sockaddr *)&ss,&slen) != 0) { - this->close((PhySocket *)&(*s),true); - } else { + if (::getpeername(s->sock, (struct sockaddr*)&ss, &slen) != 0) { + this->close((PhySocket*)&(*s), true); + } + else { s->type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED; - FD_SET(s->sock,&_readfds); - FD_CLR(s->sock,&_writefds); + FD_SET(s->sock, &_readfds); + FD_CLR(s->sock, &_writefds); #if defined(_WIN32) || defined(_WIN64) - FD_CLR(s->sock,&_exceptfds); + FD_CLR(s->sock, &_exceptfds); #endif try { - _handler->phyOnTcpConnect((PhySocket *)&(*s),&(s->uptr),true); - } catch ( ... ) {} + _handler->phyOnTcpConnect((PhySocket*)&(*s), &(s->uptr), true); + } + catch (...) { + } } } break; case ZT_PHY_SOCKET_TCP_OUT_CONNECTED: case ZT_PHY_SOCKET_TCP_IN: { - ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable - if (FD_ISSET(sock,&rfds)) { - long n = (long)::recv(sock,buf,sizeof(buf),0); + ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable + if (FD_ISSET(sock, &rfds)) { + long n = (long)::recv(sock, buf, sizeof(buf), 0); if (n <= 0) { - this->close((PhySocket *)&(*s),true); - } else { + this->close((PhySocket*)&(*s), true); + } + else { try { - _handler->phyOnTcpData((PhySocket *)&(*s),&(s->uptr),(void *)buf,(unsigned long)n); - } catch ( ... ) {} + _handler->phyOnTcpData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n); + } + catch (...) { + } } } - if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { + if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) { try { - _handler->phyOnTcpWritable((PhySocket *)&(*s),&(s->uptr)); - } catch ( ... ) {} + _handler->phyOnTcpWritable((PhySocket*)&(*s), &(s->uptr)); + } + catch (...) { + } } - } break; + } break; case ZT_PHY_SOCKET_TCP_LISTEN: - if (FD_ISSET(s->sock,&rfds)) { - memset(&ss,0,sizeof(ss)); + if (FD_ISSET(s->sock, &rfds)) { + memset(&ss, 0, sizeof(ss)); socklen_t slen = sizeof(ss); - ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock,(struct sockaddr *)&ss,&slen); + ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock, (struct sockaddr*)&ss, &slen); if (ZT_PHY_SOCKFD_VALID(newSock)) { if (_socks.size() >= ZT_PHY_MAX_SOCKETS) { ZT_PHY_CLOSE_SOCKET(newSock); - } else { + } + else { #if defined(_WIN32) || defined(_WIN64) - { BOOL f = (_noDelay ? TRUE : FALSE); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } - { u_long iMode=1; ioctlsocket(newSock,FIONBIO,&iMode); } + { + BOOL f = (_noDelay ? TRUE : FALSE); + setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f)); + } + { + u_long iMode = 1; + ioctlsocket(newSock, FIONBIO, &iMode); + } #else - { int f = (_noDelay ? 1 : 0); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } - fcntl(newSock,F_SETFL,O_NONBLOCK); + { + int f = (_noDelay ? 1 : 0); + setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f)); + } + fcntl(newSock, F_SETFL, O_NONBLOCK); #endif _socks.push_back(PhySocketImpl()); - PhySocketImpl &sws = _socks.back(); - FD_SET(newSock,&_readfds); + PhySocketImpl& sws = _socks.back(); + FD_SET(newSock, &_readfds); if ((long)newSock > _nfds) _nfds = (long)newSock; sws.type = ZT_PHY_SOCKET_TCP_IN; sws.sock = newSock; - sws.uptr = (void *)0; - memcpy(&(sws.saddr),&ss,sizeof(struct sockaddr_storage)); + sws.uptr = (void*)0; + memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage)); try { - _handler->phyOnTcpAccept((PhySocket *)&(*s),(PhySocket *)&(_socks.back()),&(s->uptr),&(sws.uptr),(const struct sockaddr *)&(sws.saddr)); - } catch ( ... ) {} + _handler->phyOnTcpAccept((PhySocket*)&(*s), (PhySocket*)&(_socks.back()), &(s->uptr), &(sws.uptr), (const struct sockaddr*)&(sws.saddr)); + } + catch (...) { + } } } } break; case ZT_PHY_SOCKET_UDP: - if (FD_ISSET(s->sock,&rfds)) { - for(int k=0;k<1024;++k) { - memset(&ss,0,sizeof(ss)); + if (FD_ISSET(s->sock, &rfds)) { +#if (defined(__linux__) || defined(linux) || defined(__linux)) && defined(MSG_WAITFORONE) +#define RECVMMSG_WINDOW_SIZE 128 +#define RECVMMSG_BUF_SIZE 1500 + iovec iovs[RECVMMSG_WINDOW_SIZE]; + uint8_t bufs[RECVMMSG_WINDOW_SIZE][RECVMMSG_BUF_SIZE]; + sockaddr_storage addrs[RECVMMSG_WINDOW_SIZE]; + memset(addrs, 0, sizeof(addrs)); + mmsghdr mm[RECVMMSG_WINDOW_SIZE]; + memset(mm, 0, sizeof(mm)); + for (int k = 0; k < 1024; ++k) { + for (int i = 0; i < RECVMMSG_WINDOW_SIZE; ++i) { + iovs[i].iov_base = (void*)bufs[i]; + iovs[i].iov_len = RECVMMSG_BUF_SIZE; + mm[i].msg_hdr.msg_name = (void*)&(addrs[i]); + mm[i].msg_hdr.msg_namelen = sizeof(sockaddr_storage); + mm[i].msg_hdr.msg_iov = &(iovs[i]); + mm[i].msg_hdr.msg_iovlen = 1; + } + int received_count = recvmmsg(s->sock, mm, RECVMMSG_WINDOW_SIZE, MSG_WAITFORONE, nullptr); + if (received_count > 0) { + for (int i = 0; i < received_count; ++i) { + long n = (long)mm[i].msg_hdr.msg_iov->iov_len; + if (n > 0) { + try { + _handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&(addrs[i]), bufs[i], (unsigned long)n); + } + catch (...) { + } + } + } + } + else { + break; + } + } +#else + for (int k = 0; k < 1024; ++k) { + memset(&ss, 0, sizeof(ss)); socklen_t slen = sizeof(ss); - long n = (long)::recvfrom(s->sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen); + long n = (long)::recvfrom(s->sock, buf, sizeof(buf), 0, (struct sockaddr*)&ss, &slen); if (n > 0) { try { - _handler->phyOnDatagram((PhySocket *)&(*s),&(s->uptr),(const struct sockaddr *)&(s->saddr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n); - } catch ( ... ) {} - } else if (n < 0) + _handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&ss, (void*)buf, (unsigned long)n); + } + catch (...) { + } + } + else if (n < 0) break; } +#endif } break; case ZT_PHY_SOCKET_UNIX_IN: { #ifdef __UNIX_LIKE__ - ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable - if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { + ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable + if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) { try { - _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); - } catch ( ... ) {} - } - if (FD_ISSET(sock,&rfds)) { - long n = (long)::read(sock,buf,sizeof(buf)); - if (n <= 0) { - this->close((PhySocket *)&(*s),true); - } else { - try { - _handler->phyOnUnixData((PhySocket *)&(*s),&(s->uptr),(void *)buf,(unsigned long)n); - } catch ( ... ) {} + _handler->phyOnUnixWritable((PhySocket*)&(*s), &(s->uptr)); + } + catch (...) { } } -#endif // __UNIX_LIKE__ - } break; + if (FD_ISSET(sock, &rfds)) { + long n = (long)::read(sock, buf, sizeof(buf)); + if (n <= 0) { + this->close((PhySocket*)&(*s), true); + } + else { + try { + _handler->phyOnUnixData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n); + } + catch (...) { + } + } + } +#endif // __UNIX_LIKE__ + } break; case ZT_PHY_SOCKET_UNIX_LISTEN: #ifdef __UNIX_LIKE__ - if (FD_ISSET(s->sock,&rfds)) { - memset(&ss,0,sizeof(ss)); + if (FD_ISSET(s->sock, &rfds)) { + memset(&ss, 0, sizeof(ss)); socklen_t slen = sizeof(ss); - ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock,(struct sockaddr *)&ss,&slen); + ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock, (struct sockaddr*)&ss, &slen); if (ZT_PHY_SOCKFD_VALID(newSock)) { if (_socks.size() >= ZT_PHY_MAX_SOCKETS) { ZT_PHY_CLOSE_SOCKET(newSock); - } else { - fcntl(newSock,F_SETFL,O_NONBLOCK); + } + else { + fcntl(newSock, F_SETFL, O_NONBLOCK); _socks.push_back(PhySocketImpl()); - PhySocketImpl &sws = _socks.back(); - FD_SET(newSock,&_readfds); + PhySocketImpl& sws = _socks.back(); + FD_SET(newSock, &_readfds); if ((long)newSock > _nfds) _nfds = (long)newSock; sws.type = ZT_PHY_SOCKET_UNIX_IN; sws.sock = newSock; - sws.uptr = (void *)0; - memcpy(&(sws.saddr),&ss,sizeof(struct sockaddr_storage)); + sws.uptr = (void*)0; + memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage)); try { //_handler->phyOnUnixAccept((PhySocket *)&(*s),(PhySocket *)&(_socks.back()),&(s->uptr),&(sws.uptr)); - } catch ( ... ) {} + } + catch (...) { + } } } } -#endif // __UNIX_LIKE__ +#endif // __UNIX_LIKE__ break; case ZT_PHY_SOCKET_FD: { ZT_PHY_SOCKFD_TYPE sock = s->sock; - const bool readable = ((FD_ISSET(sock,&rfds))&&(FD_ISSET(sock,&_readfds))); - const bool writable = ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))); - if ((readable)||(writable)) { + const bool readable = ((FD_ISSET(sock, &rfds)) && (FD_ISSET(sock, &_readfds))); + const bool writable = ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))); + if ((readable) || (writable)) { try { //_handler->phyOnFileDescriptorActivity((PhySocket *)&(*s),&(s->uptr),readable,writable); - } catch ( ... ) {} + } + catch (...) { + } } - } break; + } break; default: break; - } if (s->type == ZT_PHY_SOCKET_CLOSED) _socks.erase(s++); - else ++s; + else + ++s; } } @@ -1055,18 +1163,18 @@ public: * @param sock Socket to close * @param callHandlers If true, call handlers for TCP connect (success: false) or close (default: true) */ - inline void close(PhySocket *sock,bool callHandlers = true) + inline void close(PhySocket* sock, bool callHandlers = true) { - if (!sock) + if (! sock) return; - PhySocketImpl &sws = *(reinterpret_cast(sock)); + PhySocketImpl& sws = *(reinterpret_cast(sock)); if (sws.type == ZT_PHY_SOCKET_CLOSED) return; - FD_CLR(sws.sock,&_readfds); - FD_CLR(sws.sock,&_writefds); + FD_CLR(sws.sock, &_readfds); + FD_CLR(sws.sock, &_writefds); #if defined(_WIN32) || defined(_WIN64) - FD_CLR(sws.sock,&_exceptfds); + FD_CLR(sws.sock, &_exceptfds); #endif if (sws.type != ZT_PHY_SOCKET_FD) @@ -1074,28 +1182,34 @@ public: #ifdef __UNIX_LIKE__ if (sws.type == ZT_PHY_SOCKET_UNIX_LISTEN) - ::unlink(((struct sockaddr_un *)(&(sws.saddr)))->sun_path); -#endif // __UNIX_LIKE__ + ::unlink(((struct sockaddr_un*)(&(sws.saddr)))->sun_path); +#endif // __UNIX_LIKE__ if (callHandlers) { - switch(sws.type) { + switch (sws.type) { case ZT_PHY_SOCKET_TCP_OUT_PENDING: try { - _handler->phyOnTcpConnect(sock,&(sws.uptr),false); - } catch ( ... ) {} + _handler->phyOnTcpConnect(sock, &(sws.uptr), false); + } + catch (...) { + } break; case ZT_PHY_SOCKET_TCP_OUT_CONNECTED: case ZT_PHY_SOCKET_TCP_IN: try { - _handler->phyOnTcpClose(sock,&(sws.uptr)); - } catch ( ... ) {} + _handler->phyOnTcpClose(sock, &(sws.uptr)); + } + catch (...) { + } break; case ZT_PHY_SOCKET_UNIX_IN: #ifdef __UNIX_LIKE__ try { - _handler->phyOnUnixClose(sock,&(sws.uptr)); - } catch ( ... ) {} -#endif // __UNIX_LIKE__ + _handler->phyOnUnixClose(sock, &(sws.uptr)); + } + catch (...) { + } +#endif // __UNIX_LIKE__ break; default: break; @@ -1109,8 +1223,8 @@ public: long nfds = (long)_whackSendSocket; if ((long)_whackReceiveSocket > nfds) nfds = (long)_whackReceiveSocket; - for(typename std::list::iterator s(_socks.begin());s!=_socks.end();++s) { - if ((s->type != ZT_PHY_SOCKET_CLOSED)&&((long)s->sock > nfds)) + for (typename std::list::iterator s(_socks.begin()); s != _socks.end(); ++s) { + if ((s->type != ZT_PHY_SOCKET_CLOSED) && ((long)s->sock > nfds)) nfds = (long)s->sock; } _nfds = nfds; @@ -1118,6 +1232,6 @@ public: } }; -} // namespace ZeroTier +} // namespace ZeroTier #endif