This commit is contained in:
Grant Limberg 2021-09-02 16:22:52 -07:00
parent 932584f8fc
commit 6a49a766ca
No known key found for this signature in database
GPG key ID: 2BA62CCABBB4095A

View file

@ -794,6 +794,7 @@ void PostgreSQL::heartbeat()
const char *hostname = hostnameTmp; const char *hostname = hostnameTmp;
while (_run == 1) { while (_run == 1) {
fprintf(stderr, "%s: heartbeat\n", controllerId);
auto c = _pool->borrow(); auto c = _pool->borrow();
int64_t ts = OSUtils::now(); int64_t ts = OSUtils::now();
@ -1010,7 +1011,7 @@ void PostgreSQL::_networksWatcher_Redis() {
void PostgreSQL::commitThread() void PostgreSQL::commitThread()
{ {
fprintf(stderr, "commitThread start\n"); fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str());
std::pair<nlohmann::json,bool> qitem; std::pair<nlohmann::json,bool> qitem;
while(_commitQueue.get(qitem)&(_run == 1)) { while(_commitQueue.get(qitem)&(_run == 1)) {
//fprintf(stderr, "commitThread tick\n"); //fprintf(stderr, "commitThread tick\n");
@ -1023,7 +1024,7 @@ void PostgreSQL::commitThread()
nlohmann::json *config = &(qitem.first); nlohmann::json *config = &(qitem.first);
const std::string objtype = (*config)["objtype"]; const std::string objtype = (*config)["objtype"];
if (objtype == "member") { if (objtype == "member") {
//fprintf(stderr, "commitThread: member\n"); // fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
try { try {
auto c = _pool->borrow(); auto c = _pool->borrow();
pqxx::work w(*c->c); pqxx::work w(*c->c);
@ -1087,7 +1088,7 @@ void PostgreSQL::commitThread()
assignments.push_back(addr); assignments.push_back(addr);
} }
if (ipAssignError) { if (ipAssignError) {
fprintf(stderr, "ipAssignError\n"); fprintf(stderr, "%s: ipAssignError\n", _myAddressStr.c_str());
delete config; delete config;
config = nullptr; config = nullptr;
continue; continue;
@ -1108,15 +1109,15 @@ void PostgreSQL::commitThread()
_memberChanged(memOrig, memNew, qitem.second); _memberChanged(memOrig, memNew, qitem.second);
} else { } else {
fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", (unsigned long long)nwidInt, (unsigned long long)memberidInt); fprintf(stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt);
} }
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); fprintf(stderr, "%s ERROR: Error updating member: %s\n", _myAddressStr.c_str(), e.what());
} }
} else if (objtype == "network") { } else if (objtype == "network") {
try { try {
//fprintf(stderr, "commitThread: network\n"); // fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
auto c = _pool->borrow(); auto c = _pool->borrow();
pqxx::work w(*c->c); pqxx::work w(*c->c);
@ -1211,7 +1212,7 @@ void PostgreSQL::commitThread()
id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str())); id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str()));
} }
if (err) { if (err) {
fprintf(stderr, "route add error\n"); fprintf(stderr, "%s: route add error\n", _myAddressStr.c_str());
w.abort(); w.abort();
_pool->unborrow(c); _pool->unborrow(c);
delete config; delete config;
@ -1248,14 +1249,14 @@ void PostgreSQL::commitThread()
_networkChanged(nwOrig, nwNew, qitem.second); _networkChanged(nwOrig, nwNew, qitem.second);
} else { } else {
fprintf(stderr, "Can't notify network changed: %llu\n", (unsigned long long)nwidInt); fprintf(stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt);
} }
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating network: %s\n", e.what()); fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what());
} }
} else if (objtype == "_delete_network") { } else if (objtype == "_delete_network") {
//fprintf(stderr, "commitThread: delete network\n"); // fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
try { try {
auto c = _pool->borrow(); auto c = _pool->borrow();
pqxx::work w(*c->c); pqxx::work w(*c->c);
@ -1268,11 +1269,11 @@ void PostgreSQL::commitThread()
w.commit(); w.commit();
_pool->unborrow(c); _pool->unborrow(c);
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
} }
} else if (objtype == "_delete_member") { } else if (objtype == "_delete_member") {
//fprintf(stderr, "commitThread: delete member\n"); // fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
try { try {
auto c = _pool->borrow(); auto c = _pool->borrow();
pqxx::work w(*c->c); pqxx::work w(*c->c);
@ -1287,18 +1288,18 @@ void PostgreSQL::commitThread()
w.commit(); w.commit();
_pool->unborrow(c); _pool->unborrow(c);
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
} }
} else { } else {
fprintf(stderr, "ERROR: unknown objtype"); fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str());
} }
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what());
} }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
fprintf(stderr, "commitThread finished\n"); fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
} }
void PostgreSQL::onlineNotificationThread() void PostgreSQL::onlineNotificationThread()
@ -1315,6 +1316,7 @@ void PostgreSQL::onlineNotification_Postgres()
while (_run == 1) { while (_run == 1) {
auto c = _pool->borrow(); auto c = _pool->borrow();
try { try {
fprintf(stderr, "%s onlineNotification_Postgres\n", _myAddressStr.c_str());
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
{ {
std::lock_guard<std::mutex> l(_lastOnline_l); std::lock_guard<std::mutex> l(_lastOnline_l);