mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-05 03:53:44 +02:00
Merge branch 'dev' of github.com:zerotier/ZeroTierOne into dev
This commit is contained in:
commit
26d1cf9186
5 changed files with 216 additions and 27 deletions
|
@ -1447,7 +1447,9 @@ void EmbeddedNetworkController::_request(
|
||||||
Utils::scopy(nc->centralAuthURL, sizeof(nc->centralAuthURL), info.centralAuthURL.c_str());
|
Utils::scopy(nc->centralAuthURL, sizeof(nc->centralAuthURL), info.centralAuthURL.c_str());
|
||||||
}
|
}
|
||||||
if (!info.issuerURL.empty()) {
|
if (!info.issuerURL.empty()) {
|
||||||
|
#ifdef ZT_DEBUG
|
||||||
fprintf(stderr, "copying issuerURL to nc: %s\n", info.issuerURL.c_str());
|
fprintf(stderr, "copying issuerURL to nc: %s\n", info.issuerURL.c_str());
|
||||||
|
#endif
|
||||||
Utils::scopy(nc->issuerURL, sizeof(nc->issuerURL), info.issuerURL.c_str());
|
Utils::scopy(nc->issuerURL, sizeof(nc->issuerURL), info.issuerURL.c_str());
|
||||||
}
|
}
|
||||||
if (!info.ssoNonce.empty()) {
|
if (!info.ssoNonce.empty()) {
|
||||||
|
|
|
@ -170,6 +170,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
|
||||||
, _rc(rc)
|
, _rc(rc)
|
||||||
, _redis(NULL)
|
, _redis(NULL)
|
||||||
, _cluster(NULL)
|
, _cluster(NULL)
|
||||||
|
, _redisMemberStatus(false)
|
||||||
{
|
{
|
||||||
char myAddress[64];
|
char myAddress[64];
|
||||||
_myAddressStr = myId.address().toString(myAddress);
|
_myAddressStr = myId.address().toString(myAddress);
|
||||||
|
@ -189,6 +190,11 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
|
||||||
// it will be padded at the end with zeroes. If longer, it'll be truncated.
|
// it will be padded at the end with zeroes. If longer, it'll be truncated.
|
||||||
Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk));
|
Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk));
|
||||||
}
|
}
|
||||||
|
const char *redisMemberStatus = getenv("ZT_REDIS_MEMBER_STATUS");
|
||||||
|
if (redisMemberStatus && (strcmp(redisMemberStatus, "true") == 0)) {
|
||||||
|
_redisMemberStatus = true;
|
||||||
|
fprintf(stderr, "Using redis for member status\n");
|
||||||
|
}
|
||||||
|
|
||||||
auto c = _pool->borrow();
|
auto c = _pool->borrow();
|
||||||
pqxx::work txn{*c->c};
|
pqxx::work txn{*c->c};
|
||||||
|
@ -477,6 +483,7 @@ AuthInfo PostgreSQL::getSSOAuthInfo(const nlohmann::json &member, const std::str
|
||||||
info.ssoNonce = nonce;
|
info.ssoNonce = nonce;
|
||||||
info.ssoState = std::string(state_hex) + "_" +networkId;
|
info.ssoState = std::string(state_hex) + "_" +networkId;
|
||||||
info.centralAuthURL = redirectURL;
|
info.centralAuthURL = redirectURL;
|
||||||
|
#ifdef ZT_DEBUG
|
||||||
fprintf(
|
fprintf(
|
||||||
stderr,
|
stderr,
|
||||||
"ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\n",
|
"ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\n",
|
||||||
|
@ -485,6 +492,7 @@ AuthInfo PostgreSQL::getSSOAuthInfo(const nlohmann::json &member, const std::str
|
||||||
info.ssoNonce.c_str(),
|
info.ssoNonce.c_str(),
|
||||||
info.ssoState.c_str(),
|
info.ssoState.c_str(),
|
||||||
info.centralAuthURL.c_str());
|
info.centralAuthURL.c_str());
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str());
|
fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str());
|
||||||
|
@ -506,6 +514,21 @@ void PostgreSQL::initializeNetworks()
|
||||||
|
|
||||||
fprintf(stderr, "Initializing Networks...\n");
|
fprintf(stderr, "Initializing Networks...\n");
|
||||||
|
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
fprintf(stderr, "Init Redis for networks...\n");
|
||||||
|
try {
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
_cluster->del(setKey);
|
||||||
|
} else {
|
||||||
|
_redis->del(setKey);
|
||||||
|
}
|
||||||
|
} catch (sw::redis::Error &e) {
|
||||||
|
// ignore. if this key doesn't exist, there's no reason to delete it
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unordered_set<std::string> networkSet;
|
||||||
|
|
||||||
char qbuf[2048] = {0};
|
char qbuf[2048] = {0};
|
||||||
sprintf(qbuf, "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, "
|
sprintf(qbuf, "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, "
|
||||||
"n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, "
|
"n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, "
|
||||||
|
@ -522,7 +545,8 @@ void PostgreSQL::initializeNetworks()
|
||||||
auto c = _pool->borrow();
|
auto c = _pool->borrow();
|
||||||
auto c2 = _pool->borrow();
|
auto c2 = _pool->borrow();
|
||||||
pqxx::work w{*c->c};
|
pqxx::work w{*c->c};
|
||||||
|
|
||||||
|
fprintf(stderr, "Load networks from psql...\n");
|
||||||
auto stream = pqxx::stream_from::query(w, qbuf);
|
auto stream = pqxx::stream_from::query(w, qbuf);
|
||||||
|
|
||||||
std::tuple<
|
std::tuple<
|
||||||
|
@ -610,6 +634,8 @@ void PostgreSQL::initializeNetworks()
|
||||||
config["clientId"] = clientId.value_or("");
|
config["clientId"] = clientId.value_or("");
|
||||||
config["authorizationEndpoint"] = authorizationEndpoint.value_or("");
|
config["authorizationEndpoint"] = authorizationEndpoint.value_or("");
|
||||||
|
|
||||||
|
networkSet.insert(nwid);
|
||||||
|
|
||||||
if (dnsDomain.has_value()) {
|
if (dnsDomain.has_value()) {
|
||||||
std::string serverList = dnsServers.value();
|
std::string serverList = dnsServers.value();
|
||||||
json obj;
|
json obj;
|
||||||
|
@ -673,6 +699,23 @@ void PostgreSQL::initializeNetworks()
|
||||||
w.commit();
|
w.commit();
|
||||||
_pool->unborrow(c2);
|
_pool->unborrow(c2);
|
||||||
_pool->unborrow(c);
|
_pool->unborrow(c);
|
||||||
|
fprintf(stderr, "done.\n");
|
||||||
|
|
||||||
|
if (!networkSet.empty()) {
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
fprintf(stderr, "adding networks to redis...\n");
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
auto tx = _cluster->transaction(_myAddressStr, true);
|
||||||
|
tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
||||||
|
tx.exec();
|
||||||
|
} else {
|
||||||
|
auto tx = _redis->transaction(true);
|
||||||
|
tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
||||||
|
tx.exec();
|
||||||
|
}
|
||||||
|
fprintf(stderr, "done.\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (++this->_ready == 2) {
|
if (++this->_ready == 2) {
|
||||||
if (_waitNoticePrinted) {
|
if (_waitNoticePrinted) {
|
||||||
|
@ -680,11 +723,14 @@ void PostgreSQL::initializeNetworks()
|
||||||
}
|
}
|
||||||
_readyLock.unlock();
|
_readyLock.unlock();
|
||||||
}
|
}
|
||||||
|
fprintf(stderr, "network init done.\n");
|
||||||
} catch (sw::redis::Error &e) {
|
} catch (sw::redis::Error &e) {
|
||||||
fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what());
|
fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what());
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
|
||||||
exit(-1);
|
exit(-1);
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what());
|
fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what());
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -697,6 +743,42 @@ void PostgreSQL::initializeMembers()
|
||||||
std::unordered_map<std::string, std::string> networkMembers;
|
std::unordered_map<std::string, std::string> networkMembers;
|
||||||
fprintf(stderr, "Initializing Members...\n");
|
fprintf(stderr, "Initializing Members...\n");
|
||||||
|
|
||||||
|
std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
|
||||||
|
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
fprintf(stderr, "Initialize Redis for members...\n");
|
||||||
|
std::lock_guard<std::mutex> l(_networks_l);
|
||||||
|
std::unordered_set<std::string> deletes;
|
||||||
|
for ( auto it : _networks) {
|
||||||
|
uint64_t nwid_i = it.first;
|
||||||
|
char nwidTmp[64] = {0};
|
||||||
|
OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
|
||||||
|
std::string nwid(nwidTmp);
|
||||||
|
std::string key = setKeyBase + nwid;
|
||||||
|
deletes.insert(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!deletes.empty()) {
|
||||||
|
try {
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
auto tx = _cluster->transaction(_myAddressStr, true);
|
||||||
|
for (std::string k : deletes) {
|
||||||
|
tx.del(k);
|
||||||
|
}
|
||||||
|
tx.exec();
|
||||||
|
} else {
|
||||||
|
auto tx = _redis->transaction(true);
|
||||||
|
for (std::string k : deletes) {
|
||||||
|
tx.del(k);
|
||||||
|
}
|
||||||
|
tx.exec();
|
||||||
|
}
|
||||||
|
} catch (sw::redis::Error &e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
char qbuf[2048];
|
char qbuf[2048];
|
||||||
sprintf(qbuf, "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, "
|
sprintf(qbuf, "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, "
|
||||||
" (EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
|
" (EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
|
||||||
|
@ -717,7 +799,8 @@ void PostgreSQL::initializeMembers()
|
||||||
auto c = _pool->borrow();
|
auto c = _pool->borrow();
|
||||||
auto c2 = _pool->borrow();
|
auto c2 = _pool->borrow();
|
||||||
pqxx::work w{*c->c};
|
pqxx::work w{*c->c};
|
||||||
|
|
||||||
|
fprintf(stderr, "Load members from psql...\n");
|
||||||
auto stream = pqxx::stream_from::query(w, qbuf);
|
auto stream = pqxx::stream_from::query(w, qbuf);
|
||||||
|
|
||||||
std::tuple<
|
std::tuple<
|
||||||
|
@ -776,6 +859,8 @@ void PostgreSQL::initializeMembers()
|
||||||
std::optional<uint64_t> authenticationExpiryTime = std::get<19>(row);
|
std::optional<uint64_t> authenticationExpiryTime = std::get<19>(row);
|
||||||
std::string assignedAddresses = std::get<20>(row);
|
std::string assignedAddresses = std::get<20>(row);
|
||||||
|
|
||||||
|
networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
|
||||||
|
|
||||||
config["id"] = memberId;
|
config["id"] = memberId;
|
||||||
config["address"] = memberId;
|
config["address"] = memberId;
|
||||||
config["nwid"] = networkId;
|
config["nwid"] = networkId;
|
||||||
|
@ -830,6 +915,29 @@ void PostgreSQL::initializeMembers()
|
||||||
w.commit();
|
w.commit();
|
||||||
_pool->unborrow(c2);
|
_pool->unborrow(c2);
|
||||||
_pool->unborrow(c);
|
_pool->unborrow(c);
|
||||||
|
fprintf(stderr, "done.\n");
|
||||||
|
|
||||||
|
if (!networkMembers.empty()) {
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
fprintf(stderr, "Load member data into redis...\n");
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
auto tx = _cluster->transaction(_myAddressStr, true);
|
||||||
|
for (auto it : networkMembers) {
|
||||||
|
tx.sadd(it.first, it.second);
|
||||||
|
}
|
||||||
|
tx.exec();
|
||||||
|
} else {
|
||||||
|
auto tx = _redis->transaction(true);
|
||||||
|
for (auto it : networkMembers) {
|
||||||
|
tx.sadd(it.first, it.second);
|
||||||
|
}
|
||||||
|
tx.exec();
|
||||||
|
}
|
||||||
|
fprintf(stderr, "done.\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fprintf(stderr, "Done loading members...\n");
|
||||||
|
|
||||||
if (++this->_ready == 2) {
|
if (++this->_ready == 2) {
|
||||||
if (_waitNoticePrinted) {
|
if (_waitNoticePrinted) {
|
||||||
|
@ -879,15 +987,16 @@ void PostgreSQL::heartbeat()
|
||||||
std::string now = std::to_string(ts);
|
std::string now = std::to_string(ts);
|
||||||
std::string host_port = std::to_string(_listenPort);
|
std::string host_port = std::to_string(_listenPort);
|
||||||
std::string use_redis = (_rc != NULL) ? "true" : "false";
|
std::string use_redis = (_rc != NULL) ? "true" : "false";
|
||||||
|
std::string redis_mem_status = (_redisMemberStatus) ? "true" : "false";
|
||||||
|
|
||||||
try {
|
try {
|
||||||
pqxx::result res = w.exec0("INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) "
|
pqxx::result res = w.exec0("INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis, redis_member_status) "
|
||||||
"VALUES ("+w.quote(controllerId)+", "+w.quote(hostname)+", TO_TIMESTAMP("+now+"::double precision/1000), "+
|
"VALUES ("+w.quote(controllerId)+", "+w.quote(hostname)+", TO_TIMESTAMP("+now+"::double precision/1000), "+
|
||||||
w.quote(publicIdentity)+", "+major+", "+minor+", "+rev+", "+build+", "+host_port+", "+use_redis+") "
|
w.quote(publicIdentity)+", "+major+", "+minor+", "+rev+", "+build+", "+host_port+", "+use_redis+", "+redis_mem_status+") "
|
||||||
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
|
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
|
||||||
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
|
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
|
||||||
"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
|
"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
|
||||||
"use_redis = EXCLUDED.use_redis");
|
"use_redis = EXCLUDED.use_redis, redis_member_status = EXCLUDED.redis_member_status");
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
fprintf(stderr, "Heartbeat update failed: %s\n", e.what());
|
fprintf(stderr, "Heartbeat update failed: %s\n", e.what());
|
||||||
w.abort();
|
w.abort();
|
||||||
|
@ -899,6 +1008,14 @@ void PostgreSQL::heartbeat()
|
||||||
}
|
}
|
||||||
_pool->unborrow(c);
|
_pool->unborrow(c);
|
||||||
|
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
_cluster->zadd("controllers", "controllerId", ts);
|
||||||
|
} else {
|
||||||
|
_redis->zadd("controllers", "controllerId", ts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||||
}
|
}
|
||||||
fprintf(stderr, "Exited heartbeat thread\n");
|
fprintf(stderr, "Exited heartbeat thread\n");
|
||||||
|
@ -1241,7 +1358,7 @@ void PostgreSQL::commitThread()
|
||||||
"$1, TO_TIMESTAMP($5::double precision/1000), "
|
"$1, TO_TIMESTAMP($5::double precision/1000), "
|
||||||
"(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1),"
|
"(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1),"
|
||||||
"$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
|
"$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
|
||||||
"$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, 17) "
|
"$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) "
|
||||||
"ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
|
"ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
|
||||||
"capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
|
"capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
|
||||||
"last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
|
"last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
|
||||||
|
@ -1347,6 +1464,20 @@ void PostgreSQL::commitThread()
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what());
|
fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what());
|
||||||
}
|
}
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
try {
|
||||||
|
std::string id = config["id"];
|
||||||
|
std::string controllerId = _myAddressStr.c_str();
|
||||||
|
std::string key = "networks:{" + controllerId + "}";
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
_cluster->sadd(key, id);
|
||||||
|
} else {
|
||||||
|
_redis->sadd(key, id);
|
||||||
|
}
|
||||||
|
} catch (sw::redis::Error &e) {
|
||||||
|
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
|
||||||
|
}
|
||||||
|
}
|
||||||
} else if (objtype == "_delete_network") {
|
} else if (objtype == "_delete_network") {
|
||||||
// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
|
// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
|
||||||
try {
|
try {
|
||||||
|
@ -1361,6 +1492,22 @@ void PostgreSQL::commitThread()
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
|
fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
|
||||||
}
|
}
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
try {
|
||||||
|
std::string id = config["id"];
|
||||||
|
std::string controllerId = _myAddressStr.c_str();
|
||||||
|
std::string key = "networks:{" + controllerId + "}";
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
_cluster->srem(key, id);
|
||||||
|
_cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
|
||||||
|
} else {
|
||||||
|
_redis->srem(key, id);
|
||||||
|
_redis->del("network-nodes-online:{"+controllerId+"}:"+id);
|
||||||
|
}
|
||||||
|
} catch (sw::redis::Error &e) {
|
||||||
|
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} else if (objtype == "_delete_member") {
|
} else if (objtype == "_delete_member") {
|
||||||
// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
|
// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
|
||||||
|
@ -1378,6 +1525,23 @@ void PostgreSQL::commitThread()
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
|
fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
|
||||||
}
|
}
|
||||||
|
if (_redisMemberStatus) {
|
||||||
|
try {
|
||||||
|
std::string memberId = config["id"];
|
||||||
|
std::string networkId = config["nwid"];
|
||||||
|
std::string controllerId = _myAddressStr.c_str();
|
||||||
|
std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
|
||||||
|
if (_rc->clusterMode) {
|
||||||
|
_cluster->srem(key, memberId);
|
||||||
|
_cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
|
||||||
|
} else {
|
||||||
|
_redis->srem(key, memberId);
|
||||||
|
_redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
|
||||||
|
}
|
||||||
|
} catch (sw::redis::Error &e) {
|
||||||
|
fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str());
|
fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str());
|
||||||
}
|
}
|
||||||
|
@ -1394,10 +1558,23 @@ void PostgreSQL::commitThread()
|
||||||
|
|
||||||
void PostgreSQL::onlineNotificationThread()
|
void PostgreSQL::onlineNotificationThread()
|
||||||
{
|
{
|
||||||
waitForReady();
|
waitForReady();
|
||||||
onlineNotification_Postgres();
|
if (_redisMemberStatus) {
|
||||||
|
onlineNotification_Redis();
|
||||||
|
} else {
|
||||||
|
onlineNotification_Postgres();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ONLY UNCOMMENT FOR TEMPORARY DB MAINTENANCE
|
||||||
|
*
|
||||||
|
* This define temproarly turns off writing to the member status table
|
||||||
|
* so it can be reindexed when the indexes get too large.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// #define DISABLE_MEMBER_STATUS 1
|
||||||
|
|
||||||
void PostgreSQL::onlineNotification_Postgres()
|
void PostgreSQL::onlineNotification_Postgres()
|
||||||
{
|
{
|
||||||
_connected = 1;
|
_connected = 1;
|
||||||
|
@ -1413,7 +1590,8 @@ void PostgreSQL::onlineNotification_Postgres()
|
||||||
std::lock_guard<std::mutex> l(_lastOnline_l);
|
std::lock_guard<std::mutex> l(_lastOnline_l);
|
||||||
lastOnline.swap(_lastOnline);
|
lastOnline.swap(_lastOnline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef DISABLE_MEMBER_STATUS
|
||||||
pqxx::work w(*c->c);
|
pqxx::work w(*c->c);
|
||||||
pqxx::work w2(*c2->c);
|
pqxx::work w2(*c2->c);
|
||||||
|
|
||||||
|
@ -1472,6 +1650,7 @@ void PostgreSQL::onlineNotification_Postgres()
|
||||||
pipe.complete();
|
pipe.complete();
|
||||||
w.commit();
|
w.commit();
|
||||||
fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount);
|
fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount);
|
||||||
|
#endif
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
|
fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
|
||||||
}
|
}
|
||||||
|
@ -1515,9 +1694,7 @@ void PostgreSQL::onlineNotification_Redis()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (sw::redis::Error &e) {
|
} catch (sw::redis::Error &e) {
|
||||||
#ifdef REDIS_TRACE
|
|
||||||
fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
|
fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||||
}
|
}
|
||||||
|
@ -1528,6 +1705,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
|
||||||
|
|
||||||
{
|
{
|
||||||
nlohmann::json jtmp1, jtmp2;
|
nlohmann::json jtmp1, jtmp2;
|
||||||
|
uint64_t count = 0;
|
||||||
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
|
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
|
||||||
uint64_t nwid_i = i->first.first;
|
uint64_t nwid_i = i->first.first;
|
||||||
uint64_t memberid_i = i->first.second;
|
uint64_t memberid_i = i->first.second;
|
||||||
|
@ -1559,14 +1737,21 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
|
||||||
.zadd("active-networks:{"+controllerId+"}", networkId, ts)
|
.zadd("active-networks:{"+controllerId+"}", networkId, ts)
|
||||||
.sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
|
.sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
|
||||||
.hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
|
.hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
|
||||||
|
++count;
|
||||||
}
|
}
|
||||||
|
|
||||||
// expire records from all-nodes and network-nodes member list
|
// expire records from all-nodes and network-nodes member list
|
||||||
uint64_t expireOld = OSUtils::now() - 300000;
|
uint64_t expireOld = OSUtils::now() - 300000;
|
||||||
|
|
||||||
tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
tx.zremrangebyscore("nodes-online:{"+controllerId+"}",
|
||||||
tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
sw::redis::RightBoundedInterval<double>(expireOld,
|
||||||
tx.zremrangebyscore("active-networks:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
sw::redis::BoundType::LEFT_OPEN));
|
||||||
|
tx.zremrangebyscore("nodes-online2:{"+controllerId+"}",
|
||||||
|
sw::redis::RightBoundedInterval<double>(expireOld,
|
||||||
|
sw::redis::BoundType::LEFT_OPEN));
|
||||||
|
tx.zremrangebyscore("active-networks:{"+controllerId+"}",
|
||||||
|
sw::redis::RightBoundedInterval<double>(expireOld,
|
||||||
|
sw::redis::BoundType::LEFT_OPEN));
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> l(_networks_l);
|
std::lock_guard<std::mutex> l(_networks_l);
|
||||||
for (const auto &it : _networks) {
|
for (const auto &it : _networks) {
|
||||||
|
@ -1578,6 +1763,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tx.exec();
|
tx.exec();
|
||||||
|
fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -174,6 +174,7 @@ private:
|
||||||
RedisConfig *_rc;
|
RedisConfig *_rc;
|
||||||
std::shared_ptr<sw::redis::Redis> _redis;
|
std::shared_ptr<sw::redis::Redis> _redis;
|
||||||
std::shared_ptr<sw::redis::RedisCluster> _cluster;
|
std::shared_ptr<sw::redis::RedisCluster> _cluster;
|
||||||
|
bool _redisMemberStatus;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace ZeroTier
|
} // namespace ZeroTier
|
||||||
|
|
|
@ -498,17 +498,15 @@ snap-uninstall: FORCE
|
||||||
snap remove zerotier
|
snap remove zerotier
|
||||||
|
|
||||||
snap-build-remote: FORCE
|
snap-build-remote: FORCE
|
||||||
snapcraft remote-build --build-on=amd64,arm64,s390x,ppc64el,armhf,i386
|
cd pkg && snapcraft remote-build --build-on=amd64,arm64,s390x,ppc64el,armhf,i386
|
||||||
|
|
||||||
snap-upload-beta: FORCE
|
snap-upload-beta: FORCE
|
||||||
|
snapcraft login --with-file=snapcraft-login-data
|
||||||
|
pushd pkg
|
||||||
for SNAPFILE in ./*.snap; do\
|
for SNAPFILE in ./*.snap; do\
|
||||||
snapcraft upload --release=beta,edge,candidate $${SNAPFILE};\
|
snapcraft upload --release=stable,beta,edge,candidate $${SNAPFILE};\
|
||||||
done
|
|
||||||
|
|
||||||
snap-upload-stable: FORCE
|
|
||||||
for SNAPFILE in ./*.snap; do\
|
|
||||||
snapcraft upload --release=stable $${SNAPFILE};\
|
|
||||||
done
|
done
|
||||||
|
popd
|
||||||
|
|
||||||
synology-pkg: FORCE
|
synology-pkg: FORCE
|
||||||
cd pkg/synology ; ./build.sh build
|
cd pkg/synology ; ./build.sh build
|
||||||
|
|
|
@ -64,12 +64,14 @@ layout:
|
||||||
|
|
||||||
parts:
|
parts:
|
||||||
one:
|
one:
|
||||||
source: ./
|
plugin: make
|
||||||
plugin: dump
|
source: https://github.com/zerotier/zerotierone.git
|
||||||
organize:
|
source-branch: "master"
|
||||||
zerotier-one : usr/sbin/zerotier-one
|
build-packages:
|
||||||
zerotier-cli : usr/sbin/zerotier-cli
|
- build-essential
|
||||||
zerotier-idtool : usr/sbin/zerotier-idtool
|
- libc++-dev
|
||||||
|
make-parameters:
|
||||||
|
- ZT_SSO_SUPPORTED=0
|
||||||
filesets:
|
filesets:
|
||||||
binaries:
|
binaries:
|
||||||
- usr/sbin/zerotier-one
|
- usr/sbin/zerotier-one
|
||||||
|
|
Loading…
Add table
Reference in a new issue