diff --git a/controller/DB.hpp b/controller/DB.hpp index dfc8ac95e..fe06c24dd 100644 --- a/controller/DB.hpp +++ b/controller/DB.hpp @@ -78,12 +78,14 @@ public: void networks(std::vector &networks); - virtual void save(const nlohmann::json &record) = 0; + virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0; virtual void eraseNetwork(const uint64_t networkId) = 0; virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; + virtual void nodeIsOnline(const uint64_t memberId) = 0; + protected: struct _Network { diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 5707e6e0f..a2795d967 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -734,12 +734,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( member["nwid"] = nwids; _removeMemberNonPersistedFields(member); - if (member != origMember) { - json &revj = member["revision"]; - member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db->save(member); - } - + _db->save(&origMember,member); _addMemberNonPersistedFields(nwid,address,member,now); responseBody = OSUtils::jsonDump(member); responseContentType = "application/json"; @@ -986,12 +981,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( network["nwid"] = nwids; // legacy _removeNetworkNonPersistedFields(network); - if (network != origNetwork) { - json &revj = network["revision"]; - network["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db->save(network); - } - + _db->save(&origNetwork,network); ControllerDB::NetworkSummaryInfo ns; _db->summary(nwid,ns); _addNetworkNonPersistedFields(nwid,network,now,ns); @@ -1116,7 +1106,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) d["objtype"] = "trace"; d["ts"] = now; d["nodeId"] = Utils::hex10(rt.origin,tmp); - _db->save(d); + _db->save((nlohmann::json *)0,d); } catch ( ... ) { // drop invalid trace messages if an error occurs } @@ -1185,6 +1175,8 @@ void EmbeddedNetworkController::_request( ms.lastRequestTime = now; } + _db->nodeIsOnline(identity.address().toInt()); + Utils::hex(nwid,nwids); _db->get(nwid,network,identity.address().toInt(),member,ns); if ((!network.is_object())||(network.size() == 0)) { @@ -1299,11 +1291,7 @@ void EmbeddedNetworkController::_request( } else { // If they are not authorized, STOP! _removeMemberNonPersistedFields(member); - if (origMember != member) { - json &revj = member["revision"]; - member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db->save(member); - } + _db->save(&origMember,member); _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED); return; } @@ -1666,12 +1654,7 @@ void EmbeddedNetworkController::_request( } _removeMemberNonPersistedFields(member); - if (member != origMember) { - json &revj = member["revision"]; - member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db->save(member); - } - + _db->save(&origMember,member); _sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6); } diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp index b48d5e87f..646fa2fe7 100644 --- a/controller/FileDB.cpp +++ b/controller/FileDB.cpp @@ -69,7 +69,7 @@ bool FileDB::waitForReady() return true; } -void FileDB::save(const nlohmann::json &record) +void FileDB::save(nlohmann::json *orig,nlohmann::json &record) { char p1[16384],p2[16384]; try { @@ -126,4 +126,9 @@ void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId) { } +void FileDB::nodeIsOnline(const uint64_t memberId) +{ + // Nothing to do here right now in the filesystem store mode since we can just get this from the peer list +} + } // namespace ZeroTier diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp index fe9869b9d..76a479366 100644 --- a/controller/FileDB.hpp +++ b/controller/FileDB.hpp @@ -32,12 +32,14 @@ public: virtual bool waitForReady(); - virtual void save(const nlohmann::json &record); + virtual void save(nlohmann::json *orig,nlohmann::json &record); virtual void eraseNetwork(const uint64_t networkId); virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + virtual void nodeIsOnline(const uint64_t memberId); + protected: std::string _networksPath; }; diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index d1012167d..031bd5163 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -215,6 +215,47 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres }); } + _onlineNotificationThread = std::thread([this]() { + try { + std::unique_ptr rdb; + while (_run == 1) { + try { + if (!rdb) + rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + std::lock_guard l(_lastOnline_l); + R::Array batch; + R::Object tmpobj; + for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) { + char nodeId[16]; + Utils::hex10(i->first,nodeId); + tmpobj["id"] = nodeId; + tmpobj["ts"] = i->second; + batch.emplace_back(tmpobj); + if (batch.size() >= 256) { + R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + } + if (batch.size() > 0) + R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb); + _lastOnline.clear(); + } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what()); + rdb.reset(); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.message.c_str()); + rdb.reset(); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (node status update): unknown exception" ZT_EOL_S); + rdb.reset(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + } catch ( ... ) {} + }); + _heartbeatThread = std::thread([this]() { try { char tmp[1024]; @@ -251,9 +292,10 @@ RethinkDB::~RethinkDB() _membersDbWatcher.join(); _networksDbWatcher.join(); _heartbeatThread.join(); + _onlineNotificationThread.join(); } -void RethinkDB::waitForReady() +bool RethinkDB::waitForReady() { while (_ready > 0) { if (!_waitNoticePrinted) { @@ -263,12 +305,32 @@ void RethinkDB::waitForReady() _readyLock.lock(); _readyLock.unlock(); } + return true; } -void RethinkDB::save(const nlohmann::json &record) +void RethinkDB::save(nlohmann::json *orig,nlohmann::json &record) { + if (!record.is_object()) // sanity check + return; waitForReady(); - _commitQueue.post(new nlohmann::json(record)); + if (orig) { + if (*orig != record) { + nlohmann::json *q = new nlohmann::json(); + try { + record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; + for(auto kv=record.begin();kv!=record.end();++kv) { + if ((kv.key() == "id")||(kv.key() == "nwid")||(kv.key() == "objtype")||((*q)[kv.key()] != kv.value())) + (*q)[kv.key()] = kv.value(); + } + } catch ( ... ) { + delete q; + throw; + } + } + } else { + record["revision"] = 1; + _commitQueue.post(new nlohmann::json(record)); + } } void RethinkDB::eraseNetwork(const uint64_t networkId) @@ -295,6 +357,12 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId) _commitQueue.post(tmp); } +void RethinkDB::nodeIsOnline(const uint64_t memberId) +{ + std::lock_guard l(_lastOnline_l); + _lastOnline[memberId] = OSUtils::now(); +} + } // namespace ZeroTier #endif // ZT_CONTROLLER_USE_RETHINKDB diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp index 2309a25cf..8c8b16a64 100644 --- a/controller/RethinkDB.hpp +++ b/controller/RethinkDB.hpp @@ -34,14 +34,16 @@ public: RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); virtual ~RethinkDB(); - virtual void waitForReady(); + virtual bool waitForReady(); - virtual void save(const nlohmann::json &record); + virtual void save(nlohmann::json *orig,nlohmann::json &record); virtual void eraseNetwork(const uint64_t networkId); virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + virtual void nodeIsOnline(const uint64_t memberId); + protected: std::string _host; std::string _db; @@ -56,6 +58,10 @@ protected: BlockingQueue< nlohmann::json * > _commitQueue; std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; + std::unordered_map< uint64_t,int64_t > _lastOnline; + mutable std::mutex _lastOnline_l; + std::thread _onlineNotificationThread; + std::thread _heartbeatThread; mutable std::mutex _readyLock; // locked until ready diff --git a/node/Peer.cpp b/node/Peer.cpp index a3682a97f..2d562f120 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -78,54 +78,6 @@ void Peer::received( { const int64_t now = RR->node->now(); -/* -#ifdef ZT_ENABLE_CLUSTER - bool isClusterSuboptimalPath = false; - if ((RR->cluster)&&(hops == 0)) { - // Note: findBetterEndpoint() is first since we still want to check - // for a better endpoint even if we don't actually send a redirect. - InetAddress redirectTo; - if ( (verb != Packet::VERB_OK) && (verb != Packet::VERB_ERROR) && (verb != Packet::VERB_RENDEZVOUS) && (verb != Packet::VERB_PUSH_DIRECT_PATHS) && (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),path->address(),false)) ) { - if (_vProto >= 5) { - // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS); - outp.append((uint16_t)1); // count == 1 - outp.append((uint8_t)ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT); // flags: cluster redirect - outp.append((uint16_t)0); // no extensions - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append((uint8_t)6); - outp.append(redirectTo.rawIpData(),4); - } else { - outp.append((uint8_t)6); - outp.append((uint8_t)18); - outp.append(redirectTo.rawIpData(),16); - } - outp.append((uint16_t)redirectTo.port()); - outp.armor(_key,true,path->nextOutgoingCounter()); - path->send(RR,tPtr,outp.data(),outp.size(),now); - } else { - // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere. - Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS); - outp.append((uint8_t)0); // no flags - RR->identity.address().appendTo(outp); - outp.append((uint16_t)redirectTo.port()); - if (redirectTo.ss_family == AF_INET) { - outp.append((uint8_t)4); - outp.append(redirectTo.rawIpData(),4); - } else { - outp.append((uint8_t)16); - outp.append(redirectTo.rawIpData(),16); - } - outp.armor(_key,true,path->nextOutgoingCounter()); - path->send(RR,tPtr,outp.data(),outp.size(),now); - } - isClusterSuboptimalPath = true; - } - } -#endif -*/ - _lastReceive = now; switch (verb) { case Packet::VERB_FRAME: @@ -163,6 +115,7 @@ void Peer::received( } } + bool attemptToContact = false; if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) { Mutex::Lock _l(_paths_m); @@ -201,13 +154,17 @@ void Peer::received( _paths[replacePath].p = path; _paths[replacePath].priority = 1; } else { - attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter()); - path->sent(now); - RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb); + attemptToContact = true; } } } } + + if (attemptToContact) { + attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter()); + path->sent(now); + RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb); + } } // If we have a trust relationship periodically push a message enumerating