More Central work.

This commit is contained in:
Adam Ierymenko 2017-11-08 11:32:01 -08:00
parent 4166d8ca35
commit c12b68a6b2
6 changed files with 13 additions and 13 deletions

View file

@ -84,7 +84,7 @@ public:
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
virtual void nodeIsOnline(const uint64_t memberId) = 0; virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId) = 0;
protected: protected:
struct _Network struct _Network

View file

@ -1175,7 +1175,7 @@ void EmbeddedNetworkController::_request(
ms.lastRequestTime = now; ms.lastRequestTime = now;
} }
_db->nodeIsOnline(identity.address().toInt()); _db->nodeIsOnline(nwid,identity.address().toInt());
Utils::hex(nwid,nwids); Utils::hex(nwid,nwids);
_db->get(nwid,network,identity.address().toInt(),member,ns); _db->get(nwid,network,identity.address().toInt(),member,ns);

View file

@ -126,7 +126,7 @@ void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
{ {
} }
void FileDB::nodeIsOnline(const uint64_t memberId) void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId)
{ {
// Nothing to do here right now in the filesystem store mode since we can just get this from the peer list // Nothing to do here right now in the filesystem store mode since we can just get this from the peer list
} }

View file

@ -38,7 +38,7 @@ public:
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t memberId); virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId);
protected: protected:
std::string _networksPath; std::string _networksPath;

View file

@ -227,18 +227,18 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
R::Array batch; R::Array batch;
R::Object tmpobj; R::Object tmpobj;
for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) { for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) {
char nodeId[16]; char tmp[64];
Utils::hex10(i->first,nodeId); OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.16llx-%.10llx",i->first.first,i->first.second);
tmpobj["id"] = nodeId; tmpobj["id"] = tmp;
tmpobj["ts"] = i->second; tmpobj["ts"] = i->second;
batch.emplace_back(tmpobj); batch.emplace_back(tmpobj);
if (batch.size() >= 256) { if (batch.size() >= 256) {
R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb); R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
batch.clear(); batch.clear();
} }
} }
if (batch.size() > 0) if (batch.size() > 0)
R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb); R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
_lastOnline.clear(); _lastOnline.clear();
} }
} catch (std::exception &e) { } catch (std::exception &e) {
@ -357,10 +357,10 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
_commitQueue.post(tmp); _commitQueue.post(tmp);
} }
void RethinkDB::nodeIsOnline(const uint64_t memberId) void RethinkDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId)
{ {
std::lock_guard<std::mutex> l(_lastOnline_l); std::lock_guard<std::mutex> l(_lastOnline_l);
_lastOnline[memberId] = OSUtils::now(); _lastOnline[std::pair<uint64_t,uint64_t>(networkId,memberId)] = OSUtils::now();
} }
} // namespace ZeroTier } // namespace ZeroTier

View file

@ -42,7 +42,7 @@ public:
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t memberId); virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId);
protected: protected:
std::string _host; std::string _host;
@ -58,7 +58,7 @@ protected:
BlockingQueue< nlohmann::json * > _commitQueue; BlockingQueue< nlohmann::json * > _commitQueue;
std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS];
std::unordered_map< uint64_t,int64_t > _lastOnline; std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t > _lastOnline;
mutable std::mutex _lastOnline_l; mutable std::mutex _lastOnline_l;
std::thread _onlineNotificationThread; std::thread _onlineNotificationThread;