From 8ef4edebbfeead53e5b2b454086e21e42e809aab Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 5 Nov 2015 12:22:58 -0800 Subject: [PATCH] Deferred decode for HELLO to prevent HELLOcalypse under high load of new peers. --- include/ZeroTierOne.h | 21 +++++++ node/BinarySemaphore.hpp | 106 ++++++++++++++++++++++++++++++++++ node/DeferredPackets.cpp | 95 ++++++++++++++++++++++++++++++ node/DeferredPackets.hpp | 98 +++++++++++++++++++++++++++++++ node/IncomingPacket.cpp | 111 ++++++++++++++++++++---------------- node/IncomingPacket.hpp | 17 ++++-- node/Node.cpp | 38 +++++++++++- node/Node.hpp | 1 + node/RuntimeEnvironment.hpp | 7 +++ node/SharedPtr.hpp | 33 ++++++----- node/Switch.cpp | 8 +-- objects.mk | 1 + 12 files changed, 463 insertions(+), 73 deletions(-) create mode 100644 node/BinarySemaphore.hpp create mode 100644 node/DeferredPackets.cpp create mode 100644 node/DeferredPackets.hpp diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index e9b38c524..fd7857d99 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -1511,6 +1511,27 @@ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned */ void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs); +/** + * Do things in the background until Node dies + * + * This function can be called from one or more background threads to process + * certain tasks in the background to improve foreground performance. It will + * not return until the Node is shut down. If threading is not enabled in + * this build it will return immediately and will do nothing. + * + * This is completely optional. If this is never called, all processing is + * done in the foreground in the various processXXXX() methods. + * + * This does NOT replace or eliminate the need to call the normal + * processBackgroundTasks() function in your main loop. This mechanism is + * used to offload the processing of expensive mssages onto background + * handler threads to prevent foreground performance degradation under + * high load. + * + * @param node Node instance + */ +void ZT_Node_backgroundThreadMain(ZT_Node *node); + /** * Get ZeroTier One version * diff --git a/node/BinarySemaphore.hpp b/node/BinarySemaphore.hpp new file mode 100644 index 000000000..97d0d1c44 --- /dev/null +++ b/node/BinarySemaphore.hpp @@ -0,0 +1,106 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_BINARYSEMAPHORE_HPP +#define ZT_BINARYSEMAPHORE_HPP + +#include +#include +#include + +#include "Constants.hpp" +#include "NonCopyable.hpp" + +#ifdef __WINDOWS__ + +#include + +namespace ZeroTier { + +class BinarySemaphore : NonCopyable +{ +public: + BinarySemaphore() throw() { _sem = CreateSemaphore(NULL,0,1,NULL); } + ~BinarySemaphore() { CloseHandle(_sem); } + inline void wait() { WaitForSingleObject(_sem,INFINITE); } + inline void post() { ReleaseSemaphore(_sem,1,NULL); } +private: + HANDLE _sem; +}; + +} // namespace ZeroTier + +#else // !__WINDOWS__ + +#include + +namespace ZeroTier { + +class BinarySemaphore : NonCopyable +{ +public: + BinarySemaphore() + { + pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0); + pthread_cond_init(&_cond,(const pthread_condattr_t *)0); + _f = false; + } + + ~BinarySemaphore() + { + pthread_cond_destroy(&_cond); + pthread_mutex_destroy(&_mh); + } + + inline void wait() + { + pthread_mutex_lock(const_cast (&_mh)); + while (!_f) + pthread_cond_wait(const_cast (&_cond),const_cast (&_mh)); + _f = false; + pthread_mutex_unlock(const_cast (&_mh)); + } + + inline void post() + { + pthread_mutex_lock(const_cast (&_mh)); + _f = true; + pthread_mutex_unlock(const_cast (&_mh)); + pthread_cond_signal(const_cast (&_cond)); + } + +private: + pthread_cond_t _cond; + pthread_mutex_t _mh; + volatile bool _f; +}; + +} // namespace ZeroTier + +#endif // !__WINDOWS__ + +#endif diff --git a/node/DeferredPackets.cpp b/node/DeferredPackets.cpp new file mode 100644 index 000000000..923e13392 --- /dev/null +++ b/node/DeferredPackets.cpp @@ -0,0 +1,95 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#include "Constants.hpp" +#include "DeferredPackets.hpp" +#include "IncomingPacket.hpp" +#include "RuntimeEnvironment.hpp" +#include "Node.hpp" + +namespace ZeroTier { + +DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) : + RR(renv), + _readPtr(0), + _writePtr(0), + _die(false) +{ +} + +DeferredPackets::~DeferredPackets() +{ + _q_m.lock(); + _die = true; + _q_m.unlock(); + _q_s.post(); +} + +bool DeferredPackets::enqueue(IncomingPacket *pkt) +{ + _q_m.lock(); + const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX; + if (_q[p]) { + _q_m.unlock(); + return false; + } else { + _q[p].setToUnsafe(pkt); + ++_writePtr; + _q_m.unlock(); + _q_s.post(); + return true; + } +} + +int DeferredPackets::process() +{ + SharedPtr pkt; + + _q_m.lock(); + if (_die) { + _q_m.unlock(); + _q_s.post(); + return -1; + } + while (_readPtr == _writePtr) { + _q_m.unlock(); + _q_s.wait(); + _q_m.lock(); + if (_die) { + _q_m.unlock(); + _q_s.post(); + return -1; + } + } + pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]); + _q_m.unlock(); + + pkt->tryDecode(RR,true); + return 1; +} + +} // namespace ZeroTier diff --git a/node/DeferredPackets.hpp b/node/DeferredPackets.hpp new file mode 100644 index 000000000..1ea65f3c7 --- /dev/null +++ b/node/DeferredPackets.hpp @@ -0,0 +1,98 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_DEFERREDPACKETS_HPP +#define ZT_DEFERREDPACKETS_HPP + +#include "Constants.hpp" +#include "SharedPtr.hpp" +#include "Mutex.hpp" +#include "DeferredPackets.hpp" +#include "BinarySemaphore.hpp" + +/** + * Maximum number of deferred packets + */ +#define ZT_DEFFEREDPACKETS_MAX 1024 + +namespace ZeroTier { + +class IncomingPacket; +class RuntimeEnvironment; + +/** + * Deferred packets + * + * IncomingPacket can defer its decoding this way by enqueueing itself here. + * When this is done, deferredDecode() is called later. This is done for + * operations that may be expensive to allow them to potentially be handled + * in the background or rate limited to maintain quality of service for more + * routine operations. + */ +class DeferredPackets +{ +public: + DeferredPackets(const RuntimeEnvironment *renv); + ~DeferredPackets(); + + /** + * Enqueue a packet + * + * Since packets enqueue themselves, they call it with 'this' and we wrap + * them in a SharedPtr<>. This is safe as SharedPtr<> is introspective and + * supports this. This should not be called from any other code outside + * IncomingPacket. + * + * @param pkt Packet to process later (possibly in the background) + * @return False if queue is full + */ + bool enqueue(IncomingPacket *pkt); + + /** + * Wait for and then process a deferred packet + * + * If we are shutting down (in destructor), this returns -1 and should + * not be called again. Otherwise it returns the number of packets + * processed. + * + * @return Number processed or -1 if shutting down + */ + int process(); + +private: + SharedPtr _q[ZT_DEFFEREDPACKETS_MAX]; + const RuntimeEnvironment *const RR; + unsigned long _readPtr; + unsigned long _writePtr; + bool _die; + Mutex _q_m; + BinarySemaphore _q_s; +}; + +} // namespace ZeroTier + +#endif diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 32229ba65..f22162350 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -46,21 +46,31 @@ #include "Cluster.hpp" #include "Node.hpp" #include "AntiRecursion.hpp" +#include "DeferredPackets.hpp" namespace ZeroTier { -bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR) +bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,bool deferred) { const Address sourceAddress(source()); try { if ((cipher() == ZT_PROTO_CIPHER_SUITE__C25519_POLY1305_NONE)&&(verb() == Packet::VERB_HELLO)) { - // Unencrypted HELLOs are handled here since they are used to - // populate our identity cache in the first place. _doHELLO() is special - // in that it contains its own authentication logic. - return _doHELLO(RR); + // Unencrypted HELLOs require some potentially expensive verification, so + // do this in the background if background processing is enabled. + DeferredPackets *const dp = RR->dp; // read volatile pointer + if ((dp)&&(!deferred)) { + dp->enqueue(this); + return true; // 'handled' via deferring to background thread(s) + } else { + // A null pointer for peer to _doHELLO() tells it to run its own + // special internal authentication logic. This is done for unencrypted + // HELLOs to learn new identities, etc. + SharedPtr tmp; + return _doHELLO(RR,tmp); + } } - SharedPtr peer = RR->topology->getPeer(sourceAddress); + SharedPtr peer(RR->topology->getPeer(sourceAddress)); if (peer) { if (!dearmor(peer->key())) { TRACE("dropped packet from %s(%s), MAC authentication failed (size: %u)",peer->address().toString().c_str(),_remoteAddress.toString().c_str(),size()); @@ -79,7 +89,8 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR) default: // ignore unknown verbs, but if they pass auth check they are "received" peer->received(RR,_localAddress,_remoteAddress,hops(),packetId(),v,0,Packet::VERB_NOP); return true; - case Packet::VERB_HELLO: return _doHELLO(RR); + + case Packet::VERB_HELLO: return _doHELLO(RR,peer); case Packet::VERB_ERROR: return _doERROR(RR,peer); case Packet::VERB_OK: return _doOK(RR,peer); case Packet::VERB_WHOIS: return _doWHOIS(RR,peer); @@ -185,7 +196,7 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,const SharedPtr return true; } -bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR) +bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,SharedPtr &peer) { /* Note: this is the only packet ever sent in the clear, and it's also * the only packet that we authenticate via a different path. Authentication @@ -226,63 +237,65 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR) return true; } - SharedPtr peer(RR->topology->getPeer(id.address())); - if (peer) { - // We already have an identity with this address -- check for collisions + if (!peer) { + peer = RR->topology->getPeer(id.address()); + if (peer) { + // We already have an identity with this address -- check for collisions - if (peer->identity() != id) { - // Identity is different from the one we already have -- address collision + if (peer->identity() != id) { + // Identity is different from the one we already have -- address collision - unsigned char key[ZT_PEER_SECRET_KEY_LENGTH]; - if (RR->identity.agree(id,key,ZT_PEER_SECRET_KEY_LENGTH)) { - if (dearmor(key)) { // ensure packet is authentic, otherwise drop - TRACE("rejected HELLO from %s(%s): address already claimed",id.address().toString().c_str(),_remoteAddress.toString().c_str()); - Packet outp(id.address(),RR->identity.address(),Packet::VERB_ERROR); - outp.append((unsigned char)Packet::VERB_HELLO); - outp.append((uint64_t)pid); - outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION); - outp.armor(key,true); - RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size()); + unsigned char key[ZT_PEER_SECRET_KEY_LENGTH]; + if (RR->identity.agree(id,key,ZT_PEER_SECRET_KEY_LENGTH)) { + if (dearmor(key)) { // ensure packet is authentic, otherwise drop + TRACE("rejected HELLO from %s(%s): address already claimed",id.address().toString().c_str(),_remoteAddress.toString().c_str()); + Packet outp(id.address(),RR->identity.address(),Packet::VERB_ERROR); + outp.append((unsigned char)Packet::VERB_HELLO); + outp.append((uint64_t)pid); + outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION); + outp.armor(key,true); + RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size()); + } else { + TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str()); + } } else { - TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str()); + TRACE("rejected HELLO from %s(%s): key agreement failed",id.address().toString().c_str(),_remoteAddress.toString().c_str()); } + + return true; } else { - TRACE("rejected HELLO from %s(%s): key agreement failed",id.address().toString().c_str(),_remoteAddress.toString().c_str()); + // Identity is the same as the one we already have -- check packet integrity + + if (!dearmor(peer->key())) { + TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str()); + return true; + } + + // Continue at // VALID } - - return true; } else { - // Identity is the same as the one we already have -- check packet integrity + // We don't already have an identity with this address -- validate and learn it - if (!dearmor(peer->key())) { - TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str()); + // Check identity proof of work + if (!id.locallyValidate()) { + TRACE("dropped HELLO from %s(%s): identity invalid",id.address().toString().c_str(),_remoteAddress.toString().c_str()); return true; } + // Check packet integrity and authentication + SharedPtr newPeer(new Peer(RR->identity,id)); + if (!dearmor(newPeer->key())) { + TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str()); + return true; + } + peer = RR->topology->addPeer(newPeer); + // Continue at // VALID } - } else { - // We don't already have an identity with this address -- validate and learn it - // Check identity proof of work - if (!id.locallyValidate()) { - TRACE("dropped HELLO from %s(%s): identity invalid",id.address().toString().c_str(),_remoteAddress.toString().c_str()); - return true; - } - - // Check packet integrity and authentication - SharedPtr newPeer(new Peer(RR->identity,id)); - if (!dearmor(newPeer->key())) { - TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str()); - return true; - } - peer = RR->topology->addPeer(newPeer); - - // Continue at // VALID + // VALID -- if we made it here, packet passed identity and authenticity checks! } - // VALID -- if we made it here, packet passed identity and authenticity checks! - if (externalSurfaceAddress) RR->sa->iam(id.address(),_remoteAddress,externalSurfaceAddress,RR->topology->isRoot(id),RR->node->now()); diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp index f5dd4b27a..7fb7dbd32 100644 --- a/node/IncomingPacket.hpp +++ b/node/IncomingPacket.hpp @@ -93,14 +93,21 @@ public: * about whether the packet was valid. A rejection is 'complete.' * * Once true is returned, this must not be called again. The packet's state - * may no longer be valid. + * may no longer be valid. The only exception is deferred decoding. In this + * case true is returned to indicate to the normal decode path that it is + * finished with the packet. The packet will have added itself to the + * deferred queue and will expect tryDecode() to be called one more time + * with deferred set to true. + * + * Deferred decoding is performed by DeferredPackets.cpp and should not be + * done elsewhere. Under deferred decoding packets only get one shot and + * so the return value of tryDecode() is ignored. * * @param RR Runtime environment + * @param deferred If true, this is a deferred decode and the return is ignored * @return True if decoding and processing is complete, false if caller should try again - * @throws std::out_of_range Range error processing packet (should be discarded) - * @throws std::runtime_error Other error processing packet (should be discarded) */ - bool tryDecode(const RuntimeEnvironment *RR); + bool tryDecode(const RuntimeEnvironment *RR,bool deferred); /** * @return Time of packet receipt / start of decode @@ -132,7 +139,7 @@ private: // These are called internally to handle packet contents once it has // been authenticated, decrypted, decompressed, and classified. bool _doERROR(const RuntimeEnvironment *RR,const SharedPtr &peer); - bool _doHELLO(const RuntimeEnvironment *RR); + bool _doHELLO(const RuntimeEnvironment *RR,SharedPtr &peer); // can be called with NULL peer, while all others cannot bool _doOK(const RuntimeEnvironment *RR,const SharedPtr &peer); bool _doWHOIS(const RuntimeEnvironment *RR,const SharedPtr &peer); bool _doRENDEZVOUS(const RuntimeEnvironment *RR,const SharedPtr &peer); diff --git a/node/Node.cpp b/node/Node.cpp index 82cb7ddbc..bcf5db1ab 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -47,6 +47,7 @@ #include "Identity.hpp" #include "SelfAwareness.hpp" #include "Cluster.hpp" +#include "DeferredPackets.hpp" const struct sockaddr_storage ZT_SOCKADDR_NULL = {0}; @@ -130,7 +131,14 @@ Node::Node( Node::~Node() { Mutex::Lock _l(_networks_m); - _networks.clear(); // ensure that networks are destroyed before shutdown + Mutex::Lock _l2(RR->dpSetLock); + + _networks.clear(); // ensure that networks are destroyed before shutdow + + DeferredPackets *dp = RR->dp; + RR->dp = (DeferredPackets *)0; + delete dp; + delete RR->sa; delete RR->topology; delete RR->antiRec; @@ -637,6 +645,27 @@ void Node::clusterStatus(ZT_ClusterStatus *cs) memset(cs,0,sizeof(ZT_ClusterStatus)); } +void Node::backgroundThreadMain() +{ + RR->dpSetLock.lock(); + if (!RR->dp) { + try { + RR->dp = new DeferredPackets(RR); + } catch ( ... ) { // sanity check -- could only really happen if out of memory + RR->dpSetLock.unlock(); + return; + } + } + RR->dpSetLock.unlock(); + + for(;;) { + try { + if (RR->dp->process() < 0) + break; + } catch ( ... ) {} // sanity check -- should not throw + } +} + /****************************************************************************/ /* Node methods used only within node/ */ /****************************************************************************/ @@ -978,6 +1007,13 @@ void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs) } catch ( ... ) {} } +void ZT_Node_backgroundThreadMain(ZT_Node *node) +{ + try { + reinterpret_cast(node)->backgroundThreadMain(); + } catch ( ... ) {} +} + void ZT_version(int *major,int *minor,int *revision,unsigned long *featureFlags) { if (major) *major = ZEROTIER_ONE_VERSION_MAJOR; diff --git a/node/Node.hpp b/node/Node.hpp index 9b85b8326..800c0a558 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -125,6 +125,7 @@ public: void clusterRemoveMember(unsigned int memberId); void clusterHandleIncomingMessage(const void *msg,unsigned int len); void clusterStatus(ZT_ClusterStatus *cs); + void backgroundThreadMain(); // Internal functions ------------------------------------------------------ diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index 2ec88f723..18d9e8e5f 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -32,6 +32,7 @@ #include "Constants.hpp" #include "Identity.hpp" +#include "Mutex.hpp" namespace ZeroTier { @@ -44,6 +45,7 @@ class AntiRecursion; class NetworkController; class SelfAwareness; class Cluster; +class DeferredPackets; /** * Holds global state for an instance of ZeroTier::Node @@ -55,6 +57,7 @@ public: node(n) ,identity() ,localNetworkController((NetworkController *)0) + ,dp((DeferredPackets *)0) ,sw((Switch *)0) ,mc((Multicaster *)0) ,antiRec((AntiRecursion *)0) @@ -77,6 +80,10 @@ public: // This is set externally to an instance of this base class NetworkController *localNetworkController; + // This is created if background threads call Node::backgroundThreadMain(). + DeferredPackets *volatile dp; // can be read without lock but not written + Mutex dpSetLock; + /* * Order matters a bit here. These are constructed in this order * and then deleted in the opposite order on Node exit. The order ensures diff --git a/node/SharedPtr.hpp b/node/SharedPtr.hpp index 4ecfa8183..289c499f9 100644 --- a/node/SharedPtr.hpp +++ b/node/SharedPtr.hpp @@ -64,20 +64,6 @@ public: ++obj->__refCount; } - SharedPtr(T *obj,bool runAwayFromZombies) - throw() : - _ptr(obj) - { - // HACK: this is used in "handlers" to take ownership of naked pointers, - // an ugly pattern that really ought to be factored out. - if (runAwayFromZombies) { - if ((int)(++obj->__refCount) < 2) { - --obj->__refCount; - _ptr = (T *)0; - } - } else ++obj->__refCount; - } - SharedPtr(const SharedPtr &sp) throw() : _ptr(sp._getAndInc()) @@ -105,6 +91,25 @@ public: return *this; } + /** + * Set to a naked pointer and increment its reference count + * + * This assumes this SharedPtr is NULL and that ptr is not a 'zombie.' No + * checks are performed. + * + * @param ptr Naked pointer to assign + */ + inline void setToUnsafe(T *ptr) + { + ++ptr->__refCount; + _ptr = ptr; + } + + /** + * Swap with another pointer 'for free' without ref count overhead + * + * @param with Pointer to swap with + */ inline void swap(SharedPtr &with) throw() { diff --git a/node/Switch.cpp b/node/Switch.cpp index 97befbc62..c047a3d18 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -475,7 +475,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr &peer) { // finish processing any packets waiting on peer's public key / identity Mutex::Lock _l(_rxQueue_m); for(std::list< SharedPtr >::iterator rxi(_rxQueue.begin());rxi!=_rxQueue.end();) { - if ((*rxi)->tryDecode(RR)) + if ((*rxi)->tryDecode(RR,false)) _rxQueue.erase(rxi++); else ++rxi; } @@ -672,7 +672,7 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); _defragQueue.erase(pid); // dq no longer valid after this - if (!packet->tryDecode(RR)) { + if (!packet->tryDecode(RR,false)) { Mutex::Lock _l(_rxQueue_m); _rxQueue.push_back(packet); } @@ -746,7 +746,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); _defragQueue.erase(pid); // dq no longer valid after this - if (!packet->tryDecode(RR)) { + if (!packet->tryDecode(RR,false)) { Mutex::Lock _l(_rxQueue_m); _rxQueue.push_back(packet); } @@ -757,7 +757,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr } // else this is a duplicate head, ignore } else { // Packet is unfragmented, so just process it - if (!packet->tryDecode(RR)) { + if (!packet->tryDecode(RR,false)) { Mutex::Lock _l(_rxQueue_m); _rxQueue.push_back(packet); } diff --git a/objects.mk b/objects.mk index 540072d5d..8daec8b53 100644 --- a/objects.mk +++ b/objects.mk @@ -5,6 +5,7 @@ OBJS=\ node/C25519.o \ node/CertificateOfMembership.o \ node/Cluster.o \ + node/DeferredPackets.o \ node/Dictionary.o \ node/Identity.o \ node/IncomingPacket.o \