diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 3b901afe3..849068497 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -431,6 +431,7 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) : _startTime(OSUtils::now()), + _running(true), _db(dbPath), _node(node) { @@ -438,12 +439,19 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa EmbeddedNetworkController::~EmbeddedNetworkController() { - Mutex::Lock _l(_threads_m); - if (_threads.size() > 0) { - for(unsigned long i=0;i<(((unsigned long)_threads.size())*2);++i) + _running = false; + std::vector t; + { + Mutex::Lock _l(_threads_m); + t = _threads; + } + if (t.size() > 0) { + for(unsigned long i=0,j=(unsigned long)(t.size() * 4);i::iterator i(_threads.begin());i!=_threads.end();++i) + /* + for(std::vector::iterator i(t.begin());i!=t.end();++i) Thread::join(*i); + */ } } @@ -1111,23 +1119,23 @@ void EmbeddedNetworkController::threadMain() throw() { uint64_t lastCircuitTestCheck = 0; - for(;;) { - _RQEntry *const qe = _queue.get(); // waits on next request - if (!qe) break; // enqueue a NULL to terminate threads + _RQEntry *qe = (_RQEntry *)0; + while ((_running)&&((qe = _queue.get()))) { try { _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); } catch ( ... ) {} delete qe; - - uint64_t now = OSUtils::now(); - if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { - lastCircuitTestCheck = now; - Mutex::Lock _l(_tests_m); - for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) { - if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { - _node->circuitTestEnd(&(*i)); - _tests.erase(i++); - } else ++i; + if (_running) { + uint64_t now = OSUtils::now(); + if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { + lastCircuitTestCheck = now; + Mutex::Lock _l(_tests_m); + for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) { + if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { + _node->circuitTestEnd(&(*i)); + _tests.erase(i++); + } else ++i; + } } } } @@ -1723,13 +1731,11 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid char pfx[256]; Utils::snprintf(pfx,sizeof(pfx),"network/%.16llx/member",nwid); - { - Mutex::Lock _l(_nmiCache_m); - std::map::iterator c(_nmiCache.find(nwid)); - if ((c != _nmiCache.end())&&((now - c->second.nmiTimestamp) < 1000)) { // a short duration cache but limits CPU use on big networks - nmi = c->second; - return; - } + Mutex::Lock _l(_nmiCache_m); + std::map::iterator c(_nmiCache.find(nwid)); + if ((c != _nmiCache.end())&&((now - c->second.nmiTimestamp) < 1000)) { // a short duration cache but limits CPU use on big networks + nmi = c->second; + return; } _db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) { @@ -1770,10 +1776,7 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid }); nmi.nmiTimestamp = now; - { - Mutex::Lock _l(_nmiCache_m); - _nmiCache[nwid] = nmi; - } + _nmiCache[nwid] = nmi; } void EmbeddedNetworkController::_pushMemberUpdate(uint64_t now,uint64_t nwid,const nlohmann::json &member) diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 906f43459..04f52c7dd 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -178,6 +178,7 @@ private: const uint64_t _startTime; + volatile bool _running; BlockingQueue<_RQEntry *> _queue; std::vector _threads; Mutex _threads_m; diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp index dd8e39686..509250b95 100644 --- a/controller/JSONDB.cpp +++ b/controller/JSONDB.cpp @@ -50,7 +50,7 @@ JSONDB::JSONDB(const std::string &basePath) : OSUtils::mkdir(_basePath.c_str()); OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions } - _ready = _reload(_basePath,std::string()); + _reload(_basePath,std::string()); } bool JSONDB::writeRaw(const std::string &n,const std::string &obj) @@ -87,16 +87,16 @@ bool JSONDB::put(const std::string &n,const nlohmann::json &obj) nlohmann::json JSONDB::get(const std::string &n) { + while (!_ready) { + Thread::sleep(250); + _reload(_basePath,std::string()); + } + + if (!_isValidObjectName(n)) + return _EMPTY_JSON; + { Mutex::Lock _l(_db_m); - - while (!_ready) { - Thread::sleep(250); - _ready = _reload(_basePath,std::string()); - } - - if (!_isValidObjectName(n)) - return _EMPTY_JSON; std::map::iterator e(_db.find(n)); if (e != _db.end()) return e->second.obj; @@ -116,14 +116,16 @@ nlohmann::json JSONDB::get(const std::string &n) return _EMPTY_JSON; } - try { + { Mutex::Lock _l(_db_m); - _E &e2 = _db[n]; - e2.obj = OSUtils::jsonParse(buf); - return e2.obj; - } catch ( ... ) { - _db.erase(n); - return _EMPTY_JSON; + try { + _E &e2 = _db[n]; + e2.obj = OSUtils::jsonParse(buf); + return e2.obj; + } catch ( ... ) { + _db.erase(n); + return _EMPTY_JSON; + } } } @@ -131,7 +133,15 @@ void JSONDB::erase(const std::string &n) { if (!_isValidObjectName(n)) return; + _erase(n); + { + Mutex::Lock _l(_db_m); + _db.erase(n); + } +} +void JSONDB::_erase(const std::string &n) +{ if (_httpAddr) { std::string body; std::map headers; @@ -142,17 +152,12 @@ void JSONDB::erase(const std::string &n) return; OSUtils::rm(path.c_str()); } - - { - Mutex::Lock _l(_db_m); - _db.erase(n); - } } -bool JSONDB::_reload(const std::string &p,const std::string &b) +void JSONDB::_reload(const std::string &p,const std::string &b) { - // Assumes _db_m is locked if (_httpAddr) { + Mutex::Lock _l(_db_m); std::string body; std::map headers; const unsigned int sc = Http::GET(2147483647,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),_basePath.c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); @@ -161,18 +166,19 @@ bool JSONDB::_reload(const std::string &p,const std::string &b) nlohmann::json dbImg(OSUtils::jsonParse(body)); std::string tmp; if (dbImg.is_object()) { + _db.clear(); for(nlohmann::json::iterator i(dbImg.begin());i!=dbImg.end();++i) { if (i.value().is_object()) { tmp = i.key(); _db[tmp].obj = i.value(); } } - return true; + _ready = true; } } catch ( ... ) {} // invalid JSON, so maybe incomplete request } - return false; } else { + _ready = true; std::vector dl(OSUtils::listDirectory(p.c_str(),true)); for(std::vector::const_iterator di(dl.begin());di!=dl.end();++di) { if ((di->length() > 5)&&(di->substr(di->length() - 5) == ".json")) { @@ -181,7 +187,6 @@ bool JSONDB::_reload(const std::string &p,const std::string &b) this->_reload((p + ZT_PATH_SEPARATOR + *di),(b + *di + ZT_PATH_SEPARATOR)); } } - return true; } } diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp index 2d3a5224b..a045d1b45 100644 --- a/controller/JSONDB.hpp +++ b/controller/JSONDB.hpp @@ -72,29 +72,28 @@ public: template inline void filter(const std::string &prefix,F func) { - Mutex::Lock _l(_db_m); - while (!_ready) { Thread::sleep(250); - _ready = _reload(_basePath,std::string()); + _reload(_basePath,std::string()); } - - for(std::map::iterator i(_db.lower_bound(prefix));i!=_db.end();) { - if ((i->first.length() >= prefix.length())&&(!memcmp(i->first.data(),prefix.data(),prefix.length()))) { - if (!func(i->first,get(i->first))) { - std::map::iterator i2(i); ++i2; - this->erase(i->first); - i = i2; - } else ++i; - } else break; + { + Mutex::Lock _l(_db_m); + for(std::map::iterator i(_db.lower_bound(prefix));i!=_db.end();) { + if ((i->first.length() >= prefix.length())&&(!memcmp(i->first.data(),prefix.data(),prefix.length()))) { + if (!func(i->first,i->second.obj)) { + this->_erase(i->first); + _db.erase(i++); + } else { + ++i; + } + } else break; + } } } - inline bool operator==(const JSONDB &db) const { return ((_basePath == db._basePath)&&(_db == db._db)); } - inline bool operator!=(const JSONDB &db) const { return (!(*this == db)); } - private: - bool _reload(const std::string &p,const std::string &b); + void _erase(const std::string &n); + void _reload(const std::string &p,const std::string &b); bool _isValidObjectName(const std::string &n); std::string _genPath(const std::string &n,bool create); diff --git a/node/Node.cpp b/node/Node.cpp index 2b3f79968..ccbe94113 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -490,7 +490,8 @@ int Node::sendUserMessage(void *tptr,uint64_t dest,uint64_t typeId,const void *d void Node::setNetconfMaster(void *networkControllerInstance) { RR->localNetworkController = reinterpret_cast(networkControllerInstance); - RR->localNetworkController->init(RR->identity,this); + if (networkControllerInstance) + RR->localNetworkController->init(RR->identity,this); } ZT_ResultCode Node::circuitTestBegin(void *tptr,ZT_CircuitTest *test,void (*reportCallback)(ZT_Node *,ZT_CircuitTest *,const ZT_CircuitTestReport *)) diff --git a/osdep/Thread.hpp b/osdep/Thread.hpp index 227c2cfec..5423a8ab8 100644 --- a/osdep/Thread.hpp +++ b/osdep/Thread.hpp @@ -46,7 +46,6 @@ class Thread { public: Thread() - throw() { _th = NULL; _tid = 0; @@ -54,7 +53,6 @@ public: template static inline Thread start(C *instance) - throw(std::runtime_error) { Thread t; t._th = CreateThread(NULL,0,&___zt_threadMain,(LPVOID)instance,0,&t._tid); @@ -88,7 +86,7 @@ public: CancelSynchronousIo(t._th); } - inline operator bool() const throw() { return (_th != NULL); } + inline operator bool() const { return (_th != NULL); } private: HANDLE _th; @@ -123,33 +121,18 @@ class Thread { public: Thread() - throw() { - memset(&_tid,0,sizeof(_tid)); - pthread_attr_init(&_tattr); - // This corrects for systems with abnormally small defaults (musl) and also - // shrinks the stack on systems with large defaults to save a bit of memory. - pthread_attr_setstacksize(&_tattr,ZT_THREAD_MIN_STACK_SIZE); - _started = false; - } - - ~Thread() - { - pthread_attr_destroy(&_tattr); + memset(this,0,sizeof(Thread)); } Thread(const Thread &t) - throw() { - memcpy(&_tid,&(t._tid),sizeof(_tid)); - _started = t._started; + memcpy(this,&t,sizeof(Thread)); } inline Thread &operator=(const Thread &t) - throw() { - memcpy(&_tid,&(t._tid),sizeof(_tid)); - _started = t._started; + memcpy(this,&t,sizeof(Thread)); return *this; } @@ -163,12 +146,20 @@ public: */ template static inline Thread start(C *instance) - throw(std::runtime_error) { Thread t; - t._started = true; - if (pthread_create(&t._tid,&t._tattr,&___zt_threadMain,instance)) + pthread_attr_t tattr; + pthread_attr_init(&tattr); + // This corrects for systems with abnormally small defaults (musl) and also + // shrinks the stack on systems with large defaults to save a bit of memory. + pthread_attr_setstacksize(&tattr,ZT_THREAD_MIN_STACK_SIZE); + if (pthread_create(&t._tid,&tattr,&___zt_threadMain,instance)) { + pthread_attr_destroy(&tattr); throw std::runtime_error("pthread_create() failed, unable to create thread"); + } else { + t._started = true; + pthread_attr_destroy(&tattr); + } return t; } @@ -190,11 +181,10 @@ public: */ static inline void sleep(unsigned long ms) { usleep(ms * 1000); } - inline operator bool() const throw() { return (_started); } + inline operator bool() const { return (_started); } private: pthread_t _tid; - pthread_attr_t _tattr; volatile bool _started; };