Do the sometimes rather big "pong" in a background worker thread.

This commit is contained in:
Adam Ierymenko 2017-05-02 16:58:51 -07:00
parent 625e3e8e25
commit 8e19188f49
2 changed files with 57 additions and 41 deletions

View file

@ -467,26 +467,14 @@ void EmbeddedNetworkController::request(
{ {
if (((!_signingId)||(!_signingId.hasPrivate()))||(_signingId.address().toInt() != (nwid >> 24))||(!_sender)) if (((!_signingId)||(!_signingId.hasPrivate()))||(_signingId.address().toInt() != (nwid >> 24))||(!_sender))
return; return;
_startThreads();
{
Mutex::Lock _l(_threads_m);
if (_threads.size() == 0) {
long hwc = (long)(std::thread::hardware_concurrency() / 2);
if (hwc < 1)
hwc = 1;
else if (hwc > 16)
hwc = 16;
for(long i=0;i<hwc;++i)
_threads.push_back(Thread::start(this));
}
}
_RQEntry *qe = new _RQEntry; _RQEntry *qe = new _RQEntry;
qe->nwid = nwid; qe->nwid = nwid;
qe->requestPacketId = requestPacketId; qe->requestPacketId = requestPacketId;
qe->fromAddr = fromAddr; qe->fromAddr = fromAddr;
qe->identity = identity; qe->identity = identity;
qe->metaData = metaData; qe->metaData = metaData;
qe->type = _RQEntry::RQENTRY_TYPE_REQUEST;
_queue.post(qe); _queue.post(qe);
} }
@ -1051,33 +1039,14 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
} else if (path[0] == "ping") { } else if (path[0] == "ping") {
const uint64_t now = OSUtils::now(); _startThreads();
bool first = true; _RQEntry *qe = new _RQEntry;
std::string pong("{\"memberStatus\":{"); qe->type = _RQEntry::RQENTRY_TYPE_PING;
{ _queue.post(qe);
Mutex::Lock _l(_memberStatus_m);
pong.reserve(64 * _memberStatus.size());
_db.eachId([this,&pong,&now,&first](uint64_t networkId,uint64_t nodeId) {
char tmp[64];
uint64_t lrt = 0ULL;
auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId));
if (ms != _memberStatus.end())
lrt = ms->second.lastRequestTime;
Utils::snprintf(tmp,sizeof(tmp),"%s\"%.16llx-%.10llx\":%llu",
(first) ? "" : ",",
(unsigned long long)networkId,
(unsigned long long)nodeId,
(unsigned long long)lrt);
pong.append(tmp);
first = false;
});
}
char tmp2[256];
Utils::snprintf(tmp2,sizeof(tmp2),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime);
pong.append(tmp2);
_db.writeRaw("pong",pong);
responseBody = "{}"; char tmp[64];
Utils::snprintf(tmp,sizeof(tmp),"{\"clock\":%llu}",(unsigned long long)now);
responseBody = tmp;
responseContentType = "application/json"; responseContentType = "application/json";
return 200; return 200;
@ -1150,7 +1119,35 @@ void EmbeddedNetworkController::threadMain()
_RQEntry *qe = (_RQEntry *)0; _RQEntry *qe = (_RQEntry *)0;
while ((_running)&&((qe = _queue.get()))) { while ((_running)&&((qe = _queue.get()))) {
try { try {
_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST) {
_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
} else if (qe->type == _RQEntry::RQENTRY_TYPE_PING) {
const uint64_t now = OSUtils::now();
bool first = true;
std::string pong("{\"memberStatus\":{");
{
Mutex::Lock _l(_memberStatus_m);
pong.reserve(64 * _memberStatus.size());
_db.eachId([this,&pong,&now,&first](uint64_t networkId,uint64_t nodeId) {
char tmp[64];
uint64_t lrt = 0ULL;
auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId));
if (ms != _memberStatus.end())
lrt = ms->second.lastRequestTime;
Utils::snprintf(tmp,sizeof(tmp),"%s\"%.16llx-%.10llx\":%llu",
(first) ? "" : ",",
(unsigned long long)networkId,
(unsigned long long)nodeId,
(unsigned long long)lrt);
pong.append(tmp);
first = false;
});
}
char tmp2[256];
Utils::snprintf(tmp2,sizeof(tmp2),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime);
pong.append(tmp2);
_db.writeRaw("pong",pong);
}
} catch ( ... ) {} } catch ( ... ) {}
delete qe; delete qe;

View file

@ -26,6 +26,7 @@
#include <vector> #include <vector>
#include <set> #include <set>
#include <list> #include <list>
#include <thread>
#include "../node/Constants.hpp" #include "../node/Constants.hpp"
@ -103,11 +104,29 @@ private:
InetAddress fromAddr; InetAddress fromAddr;
Identity identity; Identity identity;
Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> metaData; Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> metaData;
enum {
RQENTRY_TYPE_REQUEST = 0,
RQENTRY_TYPE_PING = 1
} type;
}; };
static void _circuitTestCallback(ZT_Node *node,ZT_CircuitTest *test,const ZT_CircuitTestReport *report); static void _circuitTestCallback(ZT_Node *node,ZT_CircuitTest *test,const ZT_CircuitTestReport *report);
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData); void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
inline void _startThreads()
{
Mutex::Lock _l(_threads_m);
if (_threads.size() == 0) {
long hwc = (long)std::thread::hardware_concurrency();
if (hwc < 1)
hwc = 1;
else if (hwc > 16)
hwc = 16;
for(long i=0;i<hwc;++i)
_threads.push_back(Thread::start(this));
}
}
// These init objects with default and static/informational fields // These init objects with default and static/informational fields
inline void _initMember(nlohmann::json &member) inline void _initMember(nlohmann::json &member)
{ {