More work on abstracting socket manager.

This commit is contained in:
Adam Ierymenko 2014-10-21 15:18:50 -07:00
parent 128a131070
commit 2436e22f46
6 changed files with 32 additions and 68 deletions

View file

@ -84,8 +84,6 @@ struct _NodeImpl
{ {
RuntimeEnvironment renv; RuntimeEnvironment renv;
unsigned int udpPort,tcpPort;
std::string reasonForTerminationStr; std::string reasonForTerminationStr;
volatile Node::ReasonForTermination reasonForTermination; volatile Node::ReasonForTermination reasonForTermination;
@ -112,7 +110,6 @@ struct _NodeImpl
delete renv.updater; renv.updater = (SoftwareUpdater *)0; delete renv.updater; renv.updater = (SoftwareUpdater *)0;
delete renv.nc; renv.nc = (NodeConfig *)0; // shut down all networks, close taps, etc. delete renv.nc; renv.nc = (NodeConfig *)0; // shut down all networks, close taps, etc.
delete renv.topology; renv.topology = (Topology *)0; // now we no longer need routing info delete renv.topology; renv.topology = (Topology *)0; // now we no longer need routing info
delete renv.sm; renv.sm = (SocketManager *)0; // close all sockets
delete renv.sw; renv.sw = (Switch *)0; // order matters less from here down delete renv.sw; renv.sw = (Switch *)0; // order matters less from here down
delete renv.mc; renv.mc = (Multicaster *)0; delete renv.mc; renv.mc = (Multicaster *)0;
delete renv.antiRec; renv.antiRec = (AntiRecursion *)0; delete renv.antiRec; renv.antiRec = (AntiRecursion *)0;
@ -222,8 +219,7 @@ Node::Node(
const char *hp, const char *hp,
EthernetTapFactory *tf, EthernetTapFactory *tf,
RoutingTable *rt, RoutingTable *rt,
unsigned int udpPort, SocketManager *sm,
unsigned int tcpPort,
bool resetIdentity, bool resetIdentity,
const char *overrideRootTopology) throw() : const char *overrideRootTopology) throw() :
_impl(new _NodeImpl) _impl(new _NodeImpl)
@ -236,6 +232,7 @@ Node::Node(
impl->renv.tapFactory = tf; impl->renv.tapFactory = tf;
impl->renv.routingTable = rt; impl->renv.routingTable = rt;
impl->renv.sm = sm;
if (resetIdentity) { if (resetIdentity) {
// Forget identity and peer database, peer keys, etc. // Forget identity and peer database, peer keys, etc.
@ -255,8 +252,6 @@ Node::Node(
} }
} }
impl->udpPort = udpPort & 0xffff;
impl->tcpPort = tcpPort & 0xffff;
impl->reasonForTermination = Node::NODE_RUNNING; impl->reasonForTermination = Node::NODE_RUNNING;
impl->started = false; impl->started = false;
impl->running = false; impl->running = false;
@ -400,7 +395,6 @@ Node::ReasonForTermination Node::run()
RR->antiRec = new AntiRecursion(); RR->antiRec = new AntiRecursion();
RR->mc = new Multicaster(RR); RR->mc = new Multicaster(RR);
RR->sw = new Switch(RR); RR->sw = new Switch(RR);
RR->sm = new SocketManager(impl->udpPort,impl->tcpPort,&_CBztTraffic,RR);
RR->topology = new Topology(RR); RR->topology = new Topology(RR);
try { try {
RR->nc = new NodeConfig(RR); RR->nc = new NodeConfig(RR);
@ -666,7 +660,7 @@ Node::ReasonForTermination Node::run()
try { try {
unsigned long delay = std::min((unsigned long)ZT_MAX_SERVICE_LOOP_INTERVAL,RR->sw->doTimerTasks()); unsigned long delay = std::min((unsigned long)ZT_MAX_SERVICE_LOOP_INTERVAL,RR->sw->doTimerTasks());
uint64_t start = Utils::now(); uint64_t start = Utils::now();
RR->sm->poll(delay); RR->sm->poll(delay,&_CBztTraffic,RR);
lastDelayDelta = (long)(Utils::now() - start) - (long)delay; // used to detect sleep/wake lastDelayDelta = (long)(Utils::now() - start) - (long)delay; // used to detect sleep/wake
} catch (std::exception &exc) { } catch (std::exception &exc) {
LOG("unexpected exception running Switch doTimerTasks: %s",exc.what()); LOG("unexpected exception running Switch doTimerTasks: %s",exc.what());

View file

@ -36,6 +36,7 @@ namespace ZeroTier {
class EthernetTapFactory; class EthernetTapFactory;
class RoutingTable; class RoutingTable;
class SocketManager;
/** /**
* A ZeroTier One node * A ZeroTier One node
@ -85,8 +86,7 @@ public:
* @param hp Home directory path or NULL for system-wide default for this platform * @param hp Home directory path or NULL for system-wide default for this platform
* @param tf Ethernet tap factory for platform network stack * @param tf Ethernet tap factory for platform network stack
* @param rt Routing table interface for platform network stack * @param rt Routing table interface for platform network stack
* @param udpPort UDP port or 0 to disable * @param sm Socket manager for physical network I/O
* @param tcpPort TCP port or 0 to disable
* @param resetIdentity If true, delete identity before starting and regenerate * @param resetIdentity If true, delete identity before starting and regenerate
* @param overrideRootTopology Override root topology with this dictionary (in string serialized format) and do not update (default: NULL for none) * @param overrideRootTopology Override root topology with this dictionary (in string serialized format) and do not update (default: NULL for none)
*/ */
@ -94,8 +94,7 @@ public:
const char *hp, const char *hp,
EthernetTapFactory *tf, EthernetTapFactory *tf,
RoutingTable *rt, RoutingTable *rt,
unsigned int udpPort, SocketManager *sm,
unsigned int tcpPort,
bool resetIdentity, bool resetIdentity,
const char *overrideRootTopology = (const char *)0) throw(); const char *overrideRootTopology = (const char *)0) throw();

View file

@ -75,13 +75,13 @@ public:
timeOfLastPacketReceived(0), timeOfLastPacketReceived(0),
tapFactory((EthernetTapFactory *)0), tapFactory((EthernetTapFactory *)0),
routingTable((RoutingTable *)0), routingTable((RoutingTable *)0),
sm((SocketManager *)0),
log((Logger *)0), log((Logger *)0),
prng((CMWC4096 *)0), prng((CMWC4096 *)0),
http((HttpClient *)0), http((HttpClient *)0),
antiRec((AntiRecursion *)0), antiRec((AntiRecursion *)0),
mc((Multicaster *)0), mc((Multicaster *)0),
sw((Switch *)0), sw((Switch *)0),
sm((SocketManager *)0),
topology((Topology *)0), topology((Topology *)0),
nc((NodeConfig *)0), nc((NodeConfig *)0),
node((Node *)0), node((Node *)0),
@ -117,6 +117,7 @@ public:
// These are passed in from outside and are not created or deleted by the ZeroTier node core // These are passed in from outside and are not created or deleted by the ZeroTier node core
EthernetTapFactory *tapFactory; EthernetTapFactory *tapFactory;
RoutingTable *routingTable; RoutingTable *routingTable;
SocketManager *sm;
/* /*
* Order matters a bit here. These are constructed in this order * Order matters a bit here. These are constructed in this order
@ -132,7 +133,6 @@ public:
AntiRecursion *antiRec; AntiRecursion *antiRec;
Multicaster *mc; Multicaster *mc;
Switch *sw; Switch *sw;
SocketManager *sm;
Topology *topology; Topology *topology;
NodeConfig *nc; NodeConfig *nc;
Node *node; Node *node;

View file

@ -48,9 +48,7 @@ namespace ZeroTier {
class SocketManager : NonCopyable class SocketManager : NonCopyable
{ {
public: public:
SocketManager(void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg) : SocketManager() {}
_packetHandler(packetHandler),
_arg(arg) {}
virtual ~SocketManager() {} virtual ~SocketManager() {}
/** /**
@ -87,8 +85,13 @@ public:
* If called concurrently, one will block until the other completes. * If called concurrently, one will block until the other completes.
* *
* @param timeout Timeout in milliseconds, may return sooner if whack() is called * @param timeout Timeout in milliseconds, may return sooner if whack() is called
* @param handler Packet data handler
* @param arg Void argument to packet data handler
*/ */
virtual void poll(unsigned long timeout) = 0; virtual void poll(
unsigned long timeout,
void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),
void *arg);
/** /**
* Cause current or next blocking poll() operation to timeout immediately * Cause current or next blocking poll() operation to timeout immediately
@ -99,10 +102,6 @@ public:
* Close TCP sockets * Close TCP sockets
*/ */
virtual void closeTcpSockets() = 0; virtual void closeTcpSockets() = 0;
protected:
void (*_packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &);
void *_arg;
}; };
} // namespace ZeroTier } // namespace ZeroTier

View file

@ -79,9 +79,6 @@ public:
NativeSocket(const Type &t,int s) : Socket(t),_sock(s) {} NativeSocket(const Type &t,int s) : Socket(t),_sock(s) {}
int _sock; int _sock;
#endif #endif
virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0;
virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) = 0;
}; };
/** /**
@ -122,7 +119,7 @@ public:
} }
} }
virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) inline bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
{ {
Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> buf; Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> buf;
InetAddress from; InetAddress from;
@ -130,12 +127,14 @@ public:
int n = (int)recvfrom(_sock,(char *)(buf.data()),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen); int n = (int)recvfrom(_sock,(char *)(buf.data()),ZT_SOCKET_MAX_MESSAGE_LEN,0,from.saddr(),&salen);
if (n > 0) { if (n > 0) {
buf.setSize((unsigned int)n); buf.setSize((unsigned int)n);
sm->handleReceivedPacket(self,from,buf); try {
handler(self,arg,from,buf);
} catch ( ... ) {} // handlers should not throw
} }
return true; return true;
} }
virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) inline bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm)
{ {
return true; return true;
} }
@ -226,7 +225,7 @@ public:
return true; return true;
} }
virtual bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm) inline bool notifyAvailableForRead(const SharedPtr<Socket> &self,NativeSocketManager *sm,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
{ {
unsigned char buf[65536]; unsigned char buf[65536];
@ -251,7 +250,7 @@ public:
Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5); Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5);
memmove(_inbuf,_inbuf + pl,p -= pl); memmove(_inbuf,_inbuf + pl,p -= pl);
try { try {
sm->handleReceivedPacket(self,_remote,data); handler(self,arg,_remote,data);
} catch ( ... ) {} // handlers should not throw } catch ( ... ) {} // handlers should not throw
pl = 0; pl = 0;
} }
@ -261,7 +260,7 @@ public:
return true; return true;
} }
virtual bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm) inline bool notifyAvailableForWrite(const SharedPtr<Socket> &self,NativeSocketManager *sm)
{ {
Mutex::Lock _l(_writeLock); Mutex::Lock _l(_writeLock);
@ -343,12 +342,8 @@ static inline void winPipeHack(SOCKET fds[2])
} }
#endif #endif
NativeSocketManager::NativeSocketManager( NativeSocketManager::NativeSocketManager(int localUdpPort,int localTcpPort) :
int localUdpPort, SocketManager(),
int localTcpPort,
void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),
void *arg) :
SocketManager(packetHandler,arg),
_whackSendPipe(INVALID_SOCKET), _whackSendPipe(INVALID_SOCKET),
_whackReceivePipe(INVALID_SOCKET), _whackReceivePipe(INVALID_SOCKET),
_tcpV4ListenSocket(INVALID_SOCKET), _tcpV4ListenSocket(INVALID_SOCKET),
@ -707,7 +702,7 @@ bool NativeSocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTc
return false; return false;
} }
void NativeSocketManager::poll(unsigned long timeout) void NativeSocketManager::poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
{ {
fd_set rfds,wfds,efds; fd_set rfds,wfds,efds;
struct timeval tv; struct timeval tv;
@ -834,11 +829,11 @@ void NativeSocketManager::poll(unsigned long timeout)
{ {
NativeUdpSocket *usock = (NativeUdpSocket *)_udpV4Socket.ptr(); NativeUdpSocket *usock = (NativeUdpSocket *)_udpV4Socket.ptr();
if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) { if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) {
usock->notifyAvailableForRead(_udpV4Socket,this); usock->notifyAvailableForRead(_udpV4Socket,this,handler,arg);
} }
usock = (NativeUdpSocket *)_udpV6Socket.ptr(); usock = (NativeUdpSocket *)_udpV6Socket.ptr();
if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) { if ((usock)&&(FD_ISSET(usock->_sock,&rfds))) {
usock->notifyAvailableForRead(_udpV6Socket,this); usock->notifyAvailableForRead(_udpV6Socket,this,handler,arg);
} }
} }
@ -885,7 +880,7 @@ void NativeSocketManager::poll(unsigned long timeout)
} }
} }
if (FD_ISSET(tsock->_sock,&rfds)) { if (FD_ISSET(tsock->_sock,&rfds)) {
if (!tsock->notifyAvailableForRead(*s,this)) { if (!tsock->notifyAvailableForRead(*s,this,handler,arg)) {
{ {
Mutex::Lock _l2(_tcpSockets_m); Mutex::Lock _l2(_tcpSockets_m);
_tcpSockets.erase(tsock->_remote); _tcpSockets.erase(tsock->_remote);

View file

@ -58,9 +58,7 @@ class NativeUdpSocket;
class NativeTcpSocket; class NativeTcpSocket;
/** /**
* Socket I/O multiplexer * Native socket manager for Unix and Windows
*
* This wraps select(), epoll(), etc. and handles creation of Sockets.
*/ */
class NativeSocketManager : public SocketManager class NativeSocketManager : public SocketManager
{ {
@ -68,36 +66,15 @@ class NativeSocketManager : public SocketManager
friend class NativeTcpSocket; friend class NativeTcpSocket;
public: public:
/** NativeSocketManager(int localUdpPort,int localTcpPort);
* @param localUdpPort Local UDP port to bind or 0 for no UDP support
* @param localTcpPort Local TCP port to listen to or 0 for no incoming TCP connect support
* @param packetHandler Function to call when packets are received by a socket
* @param arg Second argument to packetHandler()
* @throws std::runtime_error Could not bind local port(s) or open socket(s)
*/
NativeSocketManager(
int localUdpPort,
int localTcpPort,
void (*packetHandler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),
void *arg);
virtual ~NativeSocketManager(); virtual ~NativeSocketManager();
virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen); virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen);
virtual void poll(unsigned long timeout); virtual void poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg);
virtual void whack(); virtual void whack();
virtual void closeTcpSockets(); virtual void closeTcpSockets();
private: private:
// Called by socket implementations when a packet is received
inline void handleReceivedPacket(const SharedPtr<Socket> &sock,const InetAddress &from,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &data)
throw()
{
try {
_packetHandler(sock,_arg,from,data);
} catch ( ... ) {} // handlers shouldn't throw
}
// Used by TcpSocket to register/unregister for write availability notification // Used by TcpSocket to register/unregister for write availability notification
void _startNotifyWrite(const NativeSocket *sock); void _startNotifyWrite(const NativeSocket *sock);
void _stopNotifyWrite(const NativeSocket *sock); void _stopNotifyWrite(const NativeSocket *sock);