More Wire cleanup/docs and minor fixes.

This commit is contained in:
Adam Ierymenko 2015-03-24 13:45:31 -07:00
parent 91810c5f44
commit 8d409def74

View file

@ -48,39 +48,67 @@
#include <netinet/tcp.h> #include <netinet/tcp.h>
#endif #endif
#include <list>
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
#define ZT_SELECTWIRE_SOCKFD_TYPE SOCKET #define ZT_WIRE_SOCKFD_TYPE SOCKET
#define ZT_SELECTWIRE_SOCKFD_NULL (INVALID_SOCKET) #define ZT_WIRE_SOCKFD_NULL (INVALID_SOCKET)
#define ZT_SELECTWIRE_SOCKFD_VALID(s) ((s) != INVALID_SOCKET) #define ZT_WIRE_SOCKFD_VALID(s) ((s) != INVALID_SOCKET)
#define ZT_SELECTWIRE_CLOSE_SOCKET(s) ::closesocket(s) #define ZT_WIRE_CLOSE_SOCKET(s) ::closesocket(s)
#define ZT_SELECTWIRE_MAX_SOCKETS (FD_SETSIZE) #define ZT_WIRE_MAX_SOCKETS (FD_SETSIZE)
#define ZT_SELECTWIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage #define ZT_WIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage
#else #else
#define ZT_SELECTWIRE_SOCKFD_TYPE int #define ZT_WIRE_SOCKFD_TYPE int
#define ZT_SELECTWIRE_SOCKFD_NULL (-1) #define ZT_WIRE_SOCKFD_NULL (-1)
#define ZT_SELECTWIRE_SOCKFD_VALID(s) ((s) > -1) #define ZT_WIRE_SOCKFD_VALID(s) ((s) > -1)
#define ZT_SELECTWIRE_CLOSE_SOCKET(s) ::close(s) #define ZT_WIRE_CLOSE_SOCKET(s) ::close(s)
#define ZT_SELECTWIRE_MAX_SOCKETS (FD_SETSIZE) #define ZT_WIRE_MAX_SOCKETS (FD_SETSIZE)
#define ZT_SELECTWIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage #define ZT_WIRE_SOCKADDR_STORAGE_TYPE struct sockaddr_storage
#endif #endif
namespace ZeroTier { namespace ZeroTier {
/** /**
* Wire implementation using select() for *nix or Windows * Opaque socket type
*/
typedef const void * WireSocket;
/**
* Simple templated non-blocking sockets implementation
*
* Yes there is boost::asio and libuv, but I like small binaries and I hate
* build dependencies.
* *
* This implementation takes four functions or function objects as template * This implementation takes four functions or function objects as template
* paramters: * paramters:
* *
* ON_DATAGRAM_FUNCTION(const void *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) * ON_DATAGRAM_FUNCTION(WireSocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len)
* ON_TCP_CONNECT_FUNCTION(const void *sock,void **uptr,bool success) * ON_TCP_CONNECT_FUNCTION(WireSocket *sock,void **uptr,bool success)
* ON_TCP_ACCEPT_FUNCTION(const void *sockL,const void *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) * ON_TCP_ACCEPT_FUNCTION(WireSocket *sockL,WireSocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from)
* ON_TCP_CLOSE_FUNCTION(const void *sock,void *uptr) * ON_TCP_CLOSE_FUNCTION(WireSocket *sock,void **uptr)
* ON_TCP_DATA_FUNCTION(const void *sock,void **uptr,void *data,unsigned long len) * ON_TCP_DATA_FUNCTION(WireSocket *sock,void **uptr,void *data,unsigned long len)
* ON_TCP_WRITABLE_FUNCTION(const void *sock,void **uptr) * ON_TCP_WRITABLE_FUNCTION(WireSocket *sock,void **uptr)
* *
* These templates typically refer to function objects. Templates are used to * These templates typically refer to function objects. Templates are used to
* avoid the call overhead of indirection. * avoid the call overhead of indirection, which is surprisingly high for high
* bandwidth applications pushing a lot of packets.
*
* The 'sock' pointer above is an opaque pointer to a socket. Each socket
* has a 'uptr' user-settable/modifiable pointer associated with it, which
* can be set on bind/connect calls and is passed as a void ** to permit
* resetting at any time. The ACCEPT handler takes two sets of sock and
* uptr: sockL and uptrL for the listen socket, and sockN and uptrN for
* the new TCP connection socket that has just been created.
*
* Handlers are always called. On outgoing TCP connection, CONNECT is always
* called on either success or failure followed by DATA and/or WRITABLE as
* indicated. On socket close, handlers are called unless close() is told
* explicitly not to call handlers. It is safe to close a socket within a
* handler, and in that case close() can be told not to call handlers to
* prevent recursion.
*
* This isn't thread-safe with the exception of whack(), which is safe to
* call from another thread to abort poll().
*/ */
template template
< <
@ -117,36 +145,31 @@ public:
_tcpDataHandler(tcpDataHandler), _tcpDataHandler(tcpDataHandler),
_tcpWritableHandler(tcpWritableHandler) _tcpWritableHandler(tcpWritableHandler)
{ {
for(unsigned lont i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i)
_socks[i].type = ZT_WIRE_SOCKET_NULL;
FD_ZERO(&_readfds); FD_ZERO(&_readfds);
FD_ZERO(&_writefds); FD_ZERO(&_writefds);
FD_ZERO(&_exceptfds); FD_ZERO(&_exceptfds);
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
SOCKET pipes[2]; SOCKET pipes[2];
_winPipeHack(pipes); this->_winPipeHack(pipes);
#else #else
int pipes[2]; int pipes[2];
if (::pipe(pipes)) if (::pipe(pipes))
throw std::runtime_error("unable to create pipes for select() abort"); throw std::runtime_error("unable to create pipes for select() abort");
#endif #endif
_nfds = (pipes[0] > pipes[1]) ? (long)pipes[0] : (long)pipes[1]; _nfds = (pipes[0] > pipes[1]) ? (long)pipes[0] : (long)pipes[1];
_whackReceiveSocket = pipes[0]; _whackReceiveSocket = pipes[0];
_whackSendSocket = pipes[1]; _whackSendSocket = pipes[1];
_noDelay = noDelay; _noDelay = noDelay;
} }
~Wire() ~Wire()
{ {
for(unsigned long i=0;i<_nsocks;++i) { while (!_socks.empty())
if (_socks[i].type != ZT_WIRE_SOCKET_NULL) this->close((WireSocket *)&(_socks.front()),true);
this->close(_socks[i],true); ZT_WIRE_CLOSE_SOCKET(_whackReceiveSocket);
} ZT_WIRE_CLOSE_SOCKET(_whackSendSocket);
ZT_SELECTWIRE_CLOSE_SOCKET(_whackReceiveSocket);
ZT_SELECTWIRE_CLOSE_SOCKET(_whackSendSocket);
} }
/** /**
@ -157,90 +180,112 @@ public:
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
::send(_whackSendSocket,(const char *)this,1,0); ::send(_whackSendSocket,(const char *)this,1,0);
#else #else
::write(_whackSendSocket,(const void *)this,1); ::write(_whackSendSocket,(WireSocket *)this,1);
#endif #endif
} }
/**
* @return Number of open sockets
*/
inline unsigned long count() const throw() { return _socks.size(); }
/**
* @return Maximum number of sockets allowed
*/
inline unsigned long maxCount() const throw() { return ZT_WIRE_MAX_SOCKETS; }
/** /**
* Bind a UDP socket * Bind a UDP socket
* *
* @param localAddress Local endpoint address and port * @param localAddress Local endpoint address and port
* @param uptr Initial value of user pointer associated with this socket * @param uptr Initial value of user pointer associated with this socket
* @return Socket (as opaque pointer) or NULL on failure * @param bufferSize Desired socket receive/send buffer size -- will set as close to this as possible (0 to accept default)
* @return Socket or NULL on failure to bind
*/ */
inline const void *udpBind(const struct sockaddr *localAddress,void *uptr) inline WireSocket *udpBind(const struct sockaddr *localAddress,void *uptr,int bufferSize)
{ {
ZT_SELECTWIRE_SOCKFD_TYPE s = ::socket(AF_INET6,SOCK_DGRAM,0); if (_socks.size() >= ZT_WIRE_MAX_SOCKETS)
if (!ZT_SELECTWIRE_SOCKFD_VALID(s)) return (WireSocket *)0;
return (const void *)0;
int bs = 262144; ZT_WIRE_SOCKFD_TYPE s = ::socket(localAddress->sa_family,SOCK_DGRAM,0);
while (bs >= 65536) { if (!ZT_WIRE_SOCKFD_VALID(s))
int tmpbs = bs; return (WireSocket *)0;
if (setsockopt(s,SOL_SOCKET,SO_RCVBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0)
break; if (bufferSize > 0) {
bs -= 16384; int bs = bufferSize;
} while (bs >= 65536) {
bs = 262144; int tmpbs = bs;
while (bs >= 65536) { if (setsockopt(s,SOL_SOCKET,SO_RCVBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0)
int tmpbs = bs; break;
if (setsockopt(s,SOL_SOCKET,SO_SNDBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0) bs -= 16384;
break; }
bs -= 16384; bs = bufferSize;
while (bs >= 65536) {
int tmpbs = bs;
if (setsockopt(s,SOL_SOCKET,SO_SNDBUF,(const char *)&tmpbs,sizeof(tmpbs)) == 0)
break;
bs -= 16384;
}
} }
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
BOOL f; {
if (localAddress->ss_family == AF_INET6) { BOOL f;
f = TRUE; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f)); if (localAddress->sa_family == AF_INET6) {
f = FALSE; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,(const char *)&f,sizeof(f)); f = TRUE; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f));
f = FALSE; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,(const char *)&f,sizeof(f));
}
f = FALSE; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f));
f = TRUE; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(const char *)&f,sizeof(f));
} }
f = FALSE; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f));
f = TRUE; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(const char *)&f,sizeof(f));
#else // not Windows #else // not Windows
int f; {
if (localAddress->ss_family == AF_INET6) { int f;
f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); if (localAddress->sa_family == AF_INET6) {
f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f));
#ifdef IPV6_MTU_DISCOVER #ifdef IPV6_MTU_DISCOVER
f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f)); f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f));
#endif #endif
} }
f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f)); f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f));
#ifdef IP_DONTFRAG #ifdef IP_DONTFRAG
f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f)); f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f));
#endif #endif
#ifdef IP_MTU_DISCOVER #ifdef IP_MTU_DISCOVER
f = 0; setsockopt(s,IPPROTO_IP,IP_MTU_DISCOVER,&f,sizeof(f)); f = 0; setsockopt(s,IPPROTO_IP,IP_MTU_DISCOVER,&f,sizeof(f));
#endif #endif
}
#endif // Windows or not #endif // Windows or not
if (::bind(s,localAddress,(localAddress->ss_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) { if (::bind(s,localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
ZT_SELECTWIRE_CLOSE_SOCKET(s); ZT_WIRE_CLOSE_SOCKET(s);
return (const void *)0; return (WireSocket *)0;
} }
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
u_long iMode=1; { u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); }
ioctlsocket(s,FIONBIO,&iMode);
#else #else
fcntl(s,F_SETFL,O_NONBLOCK); fcntl(s,F_SETFL,O_NONBLOCK);
#endif #endif
for(unsigned long i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i) { try {
if (_socks[i].type == ZT_WIRE_SOCKET_NULL) { _socks.push_back(WireSocketImpl());
if ((long)s > _nfds) } catch ( ... ) {
_nfds = (long)s; ZT_WIRE_CLOSE_SOCKET(s);
FD_SET(s,&_readfds); return (WireSocket *)0;
_socks[i].type = ZT_WIRE_SOCKET_UDP;
_socks[i].sock = s;
_socks[i].uptr = uptr;
memcpy(&(_socks[i].saddr),localAddress,sizeof(struct sockaddr_storage));
return (const void *)&(_socks[i]);
}
} }
WireSocketImpl &sws = _socks.back();
ZT_SELECTWIRE_CLOSE_SOCKET(s); if ((long)s > _nfds)
return (const void *)0; _nfds = (long)s;
FD_SET(s,&_readfds);
sws.type = ZT_WIRE_SOCKET_UDP;
sws.sock = s;
sws.uptr = uptr;
memcpy(&(sws.saddr),localAddress,sizeof(struct sockaddr_storage));
return (WireSocket *)&sws;
} }
/** /**
@ -253,23 +298,53 @@ public:
* @param len Length of packet * @param len Length of packet
* @return True if packet appears to have been sent successfully * @return True if packet appears to have been sent successfully
*/ */
inline bool udpSend(const void *sock,const struct sockaddr *addr,unsigned int addrlen,const void *data,unsigned long len) inline bool udpSend(WireSocket *sock,const struct sockaddr *addr,unsigned int addrlen,WireSocket *data,unsigned long len)
{ {
WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock)));
return ((long)::sendto(sws.sock,data,len,0,addr,(socklen_t)addrlen) == (long)len); return ((long)::sendto(sws.sock,data,len,0,addr,(socklen_t)addrlen) == (long)len);
} }
inline const void *tcpListen(const struct sockaddr *localAddress,void *uptr) /**
* Bind a local listen socket to listen for new TCP connections
*
* @param localAddress Local address and port
* @param uptr Initial value of uptr for new socket
* @return Socket or NULL on failure to bind
*/
inline WireSocket *tcpListen(const struct sockaddr *localAddress,void *uptr)
{ {
if (_socks.size() >= ZT_WIRE_MAX_SOCKETS)
return (WireSocket *)0;
} }
inline const void *tcpConnect(const struct sockaddr *remoteAddress,void *uptr) /**
* Start a non-blocking connect; CONNECT handler is called on success or failure
*
* Note that if NULL is returned here, the handler is not called. Such
* a return would indicate failure to allocate the socket, too many
* open sockets, etc.
*
* @param remoteAddress Remote address
* @param uptr Initial value of uptr for new socket
* @return New socket or NULL on failure
*/
inline WireSocket *tcpConnect(const struct sockaddr *remoteAddress,void *uptr)
{ {
if (_socks.size() >= ZT_WIRE_MAX_SOCKETS)
return (WireSocket *)0;
} }
inline unsigned long tcpSend(const void *sock,const void *data,unsigned long len) /**
* Attempt to send data to a TCP connection (non-blocking)
*
* @param sock An open TCP socket (other socket types will fail)
* @param data Data to send
* @param len Length of data
* @return Number of bytes actually sent or 0 on failure
*/
inline unsigned long tcpSend(WireSocket *sock,WireSocket *data,unsigned long len)
{ {
WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock)));
long n = ::send(sws.sock,data,len,0); long n = ::send(sws.sock,data,len,0);
return ((n > 0) ? (unsigned long)n : 0); return ((n > 0) ? (unsigned long)n : 0);
} }
@ -277,12 +352,16 @@ public:
/** /**
* Set whether we want to be notified via the TCP writability handler when a socket is writable * Set whether we want to be notified via the TCP writability handler when a socket is writable
* *
* Call whack() if this is being done from another thread and you want
* it to take effect immediately. Otherwise it is only guaranteed to
* take effect on the next poll().
*
* @param sock TCP connection socket (other types are not valid) * @param sock TCP connection socket (other types are not valid)
* @param notifyWritable Want writable notifications? * @param notifyWritable Want writable notifications?
*/ */
inline const void tcpSetNotifyWritable(const void *sock,bool notifyWritable) inline const void tcpSetNotifyWritable(WireSocket *sock,bool notifyWritable)
{ {
WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock)));
if (notifyWritable) { if (notifyWritable) {
FD_SET(sws.sock,&_writefds); FD_SET(sws.sock,&_writefds);
} else { } else {
@ -290,6 +369,15 @@ public:
} }
} }
/**
* Wait for activity and handle one or more events
*
* Note that this is not guaranteed to wait up to 'timeout' even
* if nothing happens, as whack() or other events such as signals
* may cause premature termination.
*
* @param timeout Timeout in milliseconds or 0 for none (forever)
*/
inline void poll(unsigned long timeout) inline void poll(unsigned long timeout)
{ {
char buf[131072]; char buf[131072];
@ -314,89 +402,83 @@ public:
#endif #endif
} }
for(unsigned long i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i) { for(std::list<WireSocketImpl>::iterator s(_socks.begin());s!=_socks.end();++s) {
switch (_socks[i].type) { switch (s->type) {
case ZT_WIRE_SOCKET_TCP_OUT_PENDING: case ZT_WIRE_SOCKET_TCP_OUT_PENDING:
if (FD_ISSET(_socks[i].sock,&efds)) if (FD_ISSET(s->sock,&efds))
this->close((const void *)&(_socks[i]),true); this->close((WireSocket *)&(_socks[i]),true);
else if (FD_ISSET(_socks[i].sock,&wfds)) { else if (FD_ISSET(s->sock,&wfds)) {
socklen_t slen = sizeof(ss); socklen_t slen = sizeof(ss);
if (::getpeername(_socks[i].sock,(strut sockaddr *)&ss,&slen) != 0) if (::getpeername(s->sock,(strut sockaddr *)&ss,&slen) != 0)
this->close((const void *)&(_socks[i]),true); this->close((WireSocket *)&(_socks[i]),true);
else { else {
_socks[i].type = ZT_WIRE_SOCKET_TCP_OUT_CONNECTED; s->type = ZT_WIRE_SOCKET_TCP_OUT_CONNECTED;
FD_SET(_socks[i].sock,&_readfds); FD_SET(s->sock,&_readfds);
FD_CLR(_socks[i].sock,&_writefds); FD_CLR(s->sock,&_writefds);
FD_CLR(_socks[i].sock,&_exceptfds); FD_CLR(s->sock,&_exceptfds);
try { try {
_tcpConnectHandler((const void *)&(_socks[i]),&(_socks[i].uptr),true); _tcpConnectHandler((WireSocket *)&(_socks[i]),&(s->uptr),true);
} catch ( ... ) {} } catch ( ... ) {}
} }
} }
break; break;
case ZT_WIRE_SOCKET_TCP_OUT_CONNECTED: case ZT_WIRE_SOCKET_TCP_OUT_CONNECTED:
case ZT_WIRE_SOCKET_TCP_IN: case ZT_WIRE_SOCKET_TCP_IN:
if (FD_ISSET(_socks[i].sock,&rfds)) { if (FD_ISSET(s->sock,&rfds)) {
long n = (long)::recv(_socks[i].sock,buf,sizeof(buf),0); long n = (long)::recv(s->sock,buf,sizeof(buf),0);
if (n <= 0) { if (n <= 0) {
this->close((const void *)&(_socks[i]),true); this->close((WireSocket *)&(_socks[i]),true);
} else { } else {
try { try {
_tcpDataHandler((const void *)&(_socks[i]),&(_socks[i].uptr),(void *)buf,(unsigned long)n); _tcpDataHandler((WireSocket *)&(_socks[i]),&(s->uptr),(void *)buf,(unsigned long)n);
} catch ( ... ) {} } catch ( ... ) {}
} }
} }
if ((FD_ISSET(_socks[i].sock,&wfds))&&(FD_ISSET(_socks[i].sock,&_writefds))) { if ((FD_ISSET(s->sock,&wfds))&&(FD_ISSET(s->sock,&_writefds))) {
try { try {
_tcpWritableHandler((const void *)&(_socks[i]),&(_socks[i].uptr)); _tcpWritableHandler((WireSocket *)&(_socks[i]),&(s->uptr));
} catch ( ... ) {} } catch ( ... ) {}
} }
break; break;
case ZT_WIRE_SOCKET_TCP_LISTEN: case ZT_WIRE_SOCKET_TCP_LISTEN:
if (FD_ISSET(_socks[i].sock,&rfds)) { if (FD_ISSET(s->sock,&rfds)) {
memset(&ss,0,sizeof(ss)); memset(&ss,0,sizeof(ss));
socklen_t slen = sizeof(ss); socklen_t slen = sizeof(ss);
ZT_SELECTWIRE_SOCKFD_TYPE s = ::accept(_socks[i].sock,(struct sockaddr *)&ss,&slen); ZT_WIRE_SOCKFD_TYPE newSock = ::accept(s->sock,(struct sockaddr *)&ss,&slen);
if (ZT_SELECTWIRE_SOCKFD_VALID(s)) { if (ZT_WIRE_SOCKFD_VALID(newSock)) {
if (_socks.size() >= ZT_WIRE_MAX_SOCKETS) {
ZT_WIRE_CLOSE_SOCKET(newSock);
} else {
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
if (_noDelay) { BOOL f = TRUE; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } { BOOL f = (_noDelay ? TRUE : FALSE); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
u_long iMode=1; { u_long iMode=1; ioctlsocket(newSock,FIONBIO,&iMode); }
ioctlsocket(s,FIONBIO,&iMode);
#else #else
if (_noDelay) { int f = 1; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); } { int f = (_noDelay ? 1 : 0); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
fcntl(s,F_SETFL,O_NONBLOCK); fcntl(newSock,F_SETFL,O_NONBLOCK);
#endif #endif
bool haveSlot = false; _socks.push_back(WireSocketImpl());
for(unsigned long k=0;k<ZT_SELECTWIRE_MAX_SOCKETS;++k) { WireSocketImpl &sws = _socks.back();
if (_socks[k].type == ZT_WIRE_SOCKET_NULL) { FD_SET(newSock,&_readfds);
FD_SET(s,&_readfds); if ((long)newSock > _nfds)
haveSlot = true; _nfds = (long)newSock;
if ((long)s > _nfds) sws.type = ZT_WIRE_SOCKET_UDP;
_nfds = (long)s; sws.sock = s;
FD_SET(s,&_readfds); sws.uptr = (void *)0;
_socks[k].type = ZT_WIRE_SOCKET_UDP; memcpy(&(sws.saddr),&ss,sizeof(struct sockaddr_storage));
_socks[k].sock = s; try {
_socks[k].uptr = (void *)0; _tcpAcceptHandler((WireSocket *)&(*s),(WireSocket *)&(_socks.back()),&(s->uptr),&(sws.uptr),(const struct sockaddr *)&(sws.saddr));
memcpy(&(_socks[k].saddr),&ss,sizeof(struct sockaddr_storage)); } catch ( ... ) {}
try {
_tcpAcceptHandler((const void *)&(_socks[i]),(const void *)&(_socks[k]),&(_socks[i].uptr),&(_socks[k].uptr),(const struct sockaddr *)&(_socks[k].saddr));
} catch ( ... ) {}
break;
}
}
if (!haveSlot)
ZT_SELECTWIRE_CLOSE_SOCKET(s);
} }
} }
break; break;
case ZT_WIRE_SOCKET_UDP: case ZT_WIRE_SOCKET_UDP:
if (FD_ISSET(_socks[i].sock,&rfds)) { if (FD_ISSET(s->sock,&rfds)) {
memset(&ss,0,sizeof(ss)); memset(&ss,0,sizeof(ss));
socklen_t slen = sizeof(ss); socklen_t slen = sizeof(ss);
long n = (long)::recvfrom(_socks[i].sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen); long n = (long)::recvfrom(s->sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen);
if (n > 0) { if (n > 0) {
try { try {
_dgHandler((const void *)&(_socks[i]),&(_socks[i].uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n); _dgHandler((WireSocket *)&(_socks[i]),&(s->uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n);
} catch ( ... ) {} } catch ( ... ) {}
} }
} }
@ -407,24 +489,23 @@ public:
} }
} }
inline void close(const void *sock,bool callHandlers) inline void close(WireSocket *sock,bool callHandlers)
{ {
WireSocket &sws = *(const_cast <WireSocket *>(reinterpret_cast<const WireSocket *>(sock))); if (!sock)
return;
const WireSocketType oldType = sws.type; WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock)));
sws.type = ZT_WIRE_SOCKET_NULL;
FD_CLR(sws.sock,&_readfds); FD_CLR(sws.sock,&_readfds);
FD_CLR(sws.sock,&_writefds); FD_CLR(sws.sock,&_writefds);
FD_CLR(sws.sock,&_exceptfds); FD_CLR(sws.sock,&_exceptfds);
ZT_SELECTWIRE_CLOSE_SOCKET(sws.sock); ZT_WIRE_CLOSE_SOCKET(sws.sock);
switch(oldType) { switch(sws.type) {
case ZT_WIRE_SOCKET_TCP_OUT_PENDING: case ZT_WIRE_SOCKET_TCP_OUT_PENDING:
if (callHandlers) { if (callHandlers) {
try { try {
_tcpConnectHandler((const void *)&sws,&(sws.uptr),false); _tcpConnectHandler(sock,&(sws.uptr),false);
} catch ( ... ) {} } catch ( ... ) {}
} }
break; break;
@ -432,7 +513,7 @@ public:
case ZT_WIRE_SOCKET_TCP_IN: case ZT_WIRE_SOCKET_TCP_IN:
if (callHandlers) { if (callHandlers) {
try { try {
_tcpCloseHandler((const void *)&sws,sws.uptr); _tcpCloseHandler(sock,&(sws.uptr));
} catch ( ... ) {} } catch ( ... ) {}
} }
break; break;
@ -442,12 +523,21 @@ public:
if ((long)sws.sock >= _nfds) { if ((long)sws.sock >= _nfds) {
long nfds = (long)_whackSendSocket; long nfds = (long)_whackSendSocket;
for(unsigned long i=0;i<ZT_SELECTWIRE_MAX_SOCKETS;++i) { if ((long)_whackReceiveSocket > nfds)
if ((_socks[i].type != ZT_WIRE_SOCKET_NULL)&&((long)_socks[i].sock > nfds)) nfds = (long)_whackReceiveSocket;
nfds = (long)_socks[i].sock; for(std::list<WireSocketImpl>::iterator s(_socks.begin());s!=_socks.end();++s) {
if ((long)s->sock > nfds)
nfds = (long)s->sock;
} }
_nfds = nfds; _nfds = nfds;
} }
for(std::list<WireSocketImpl>::iterator s(_socks.begin());s!=_socks.end();++s) {
if (&(*s) == sock) {
_socks.erase(s);
break;
}
}
} }
private: private:
@ -487,19 +577,18 @@ private:
ZT_WIRE_SOCKET_TCP_IN = 0x02, ZT_WIRE_SOCKET_TCP_IN = 0x02,
ZT_WIRE_SOCKET_TCP_LISTEN = 0x03, // isTCP() == ((type & 0x03) != 0) ZT_WIRE_SOCKET_TCP_LISTEN = 0x03, // isTCP() == ((type & 0x03) != 0)
ZT_WIRE_SOCKET_RAW = 0x04, ZT_WIRE_SOCKET_RAW = 0x04,
ZT_WIRE_SOCKET_UDP = 0x05, ZT_WIRE_SOCKET_UDP = 0x05
ZT_WIRE_SOCKET_NULL = 0x06
}; };
struct WireSocket struct WireSocketImpl
{ {
WireSocketType type; WireSocketType type;
ZT_SELECTWIRE_SOCKFD_TYPE sock; ZT_WIRE_SOCKFD_TYPE sock;
void *uptr; // user-settable pointer void *uptr; // user-settable pointer
ZT_SELECTWIRE_SOCKADDR_STORAGE_TYPE saddr; // from address for TCP_IN, local address otherwise ZT_WIRE_SOCKADDR_STORAGE_TYPE saddr; // from address for TCP_IN, local address otherwise
}; };
inline bool _isTCP(const WireSocket &sws) const throw() { return ((((unsigned int)sws.type) & 0x03) != 0); } inline bool _isTCP(const WireSocketImpl &sws) const throw() { return ((((unsigned int)sws.type) & 0x03) != 0); }
ON_DATAGRAM_FUNCTION _dgHandler; ON_DATAGRAM_FUNCTION _dgHandler;
ON_TCP_CONNECT_FUNCTION _tcpConnectHandler; ON_TCP_CONNECT_FUNCTION _tcpConnectHandler;
@ -508,13 +597,12 @@ private:
ON_TCP_DATA_FUNCTION _tcpDataHandler; ON_TCP_DATA_FUNCTION _tcpDataHandler;
ON_TCP_WRITABLE_FUNCTION _tcpWritableHandler; ON_TCP_WRITABLE_FUNCTION _tcpWritableHandler;
WireSocket _socks[ZT_SELECTWIRE_MAX_SOCKETS]; std::list<WireSocketImpl> _socks;
fd_set _readfds,_writefds,_exceptfds; fd_set _readfds,_writefds,_exceptfds;
long _nfds; long _nfds;
ZT_SELECTWIRE_SOCKFD_TYPE _whackReceiveSocket; ZT_WIRE_SOCKFD_TYPE _whackReceiveSocket;
ZT_SELECTWIRE_SOCKFD_TYPE _whackSendSocket; ZT_WIRE_SOCKFD_TYPE _whackSendSocket;
bool _noDelay; bool _noDelay;
}; };