use pqxx::pipeline for online update thread

This commit is contained in:
Grant Limberg 2021-10-06 09:39:30 -07:00
parent 4d26b5a868
commit 3818351287
No known key found for this signature in database
GPG key ID: 2BA62CCABBB4095A

View file

@ -1337,6 +1337,7 @@ void PostgreSQL::onlineNotification_Postgres()
nlohmann::json jtmp1, jtmp2; nlohmann::json jtmp1, jtmp2;
while (_run == 1) { while (_run == 1) {
auto c = _pool->borrow(); auto c = _pool->borrow();
auto c2 = _pool->borrow();
try { try {
fprintf(stderr, "%s onlineNotification_Postgres\n", _myAddressStr.c_str()); fprintf(stderr, "%s onlineNotification_Postgres\n", _myAddressStr.c_str());
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
@ -1346,15 +1347,16 @@ void PostgreSQL::onlineNotification_Postgres()
} }
pqxx::work w(*c->c); pqxx::work w(*c->c);
pqxx::work w2(*c2->c);
fprintf(stderr, "online notification tick\n");
// using pqxx::stream_to would be a really nice alternative here, but
// unfortunately it doesn't support upserts.
// fprintf(stderr, "online notification tick\n");
std::stringstream memberUpdate;
memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ";
bool firstRun = true; bool firstRun = true;
bool memberAdded = false; bool memberAdded = false;
int updateCount = 0; int updateCount = 0;
pqxx::pipeline pipe(w);
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
updateCount += 1; updateCount += 1;
uint64_t nwid_i = i->first.first; uint64_t nwid_i = i->first.first;
@ -1371,16 +1373,10 @@ void PostgreSQL::onlineNotification_Postgres()
std::string networkId(nwidTmp); std::string networkId(nwidTmp);
std::string memberId(memTmp); std::string memberId(memTmp);
const char *qvals[2] = {
networkId.c_str(),
memberId.c_str()
};
try { try {
pqxx::row r = w.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", pqxx::row r = w2.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2",
networkId, memberId); networkId, memberId);
} catch (pqxx::unexpected_rows &e) { } catch (pqxx::unexpected_rows &e) {
// fprintf(stderr, "Member count failed: %s\n", e.what());
continue; continue;
} }
@ -1388,32 +1384,30 @@ void PostgreSQL::onlineNotification_Postgres()
std::string ipAddr = i->second.second.toIpString(ipTmp); std::string ipAddr = i->second.second.toIpString(ipTmp);
std::string timestamp = std::to_string(ts); std::string timestamp = std::to_string(ts);
if (firstRun) { std::stringstream memberUpdate;
firstRun = false; memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "
} else { << "('" << networkId << "', '" << memberId << "', ";
memberUpdate << ", ";
}
memberUpdate << "('" << networkId << "', '" << memberId << "', ";
if (ipAddr.empty()) { if (ipAddr.empty()) {
memberUpdate << "NULL, "; memberUpdate << "NULL, ";
} else { } else {
memberUpdate << "'" << ipAddr << "', "; memberUpdate << "'" << ipAddr << "', ";
} }
memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000)) "
memberAdded = true; << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated";
}
memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;";
if (memberAdded) { pipe.insert(memberUpdate.str());
//fprintf(stderr, "%s\n", memberUpdate.str().c_str());
pqxx::result res = w.exec0(memberUpdate.str());
w.commit();
} }
while(!pipe.empty()) {
pipe.retrieve();
}
pipe.complete();
w.commit();
fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount); fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount);
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
} }
_pool->unborrow(c2);
_pool->unborrow(c); _pool->unborrow(c);
ConnectionPoolStats stats = _pool->get_stats(); ConnectionPoolStats stats = _pool->get_stats();