From 3818351287eab9c734011c7be61e0011a56f30c1 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Wed, 6 Oct 2021 09:39:30 -0700 Subject: [PATCH] use pqxx::pipeline for online update thread --- controller/PostgreSQL.cpp | 48 +++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 53d88d6fd..9edcf0594 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -1337,6 +1337,7 @@ void PostgreSQL::onlineNotification_Postgres() nlohmann::json jtmp1, jtmp2; while (_run == 1) { auto c = _pool->borrow(); + auto c2 = _pool->borrow(); try { fprintf(stderr, "%s onlineNotification_Postgres\n", _myAddressStr.c_str()); std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; @@ -1346,15 +1347,16 @@ void PostgreSQL::onlineNotification_Postgres() } pqxx::work w(*c->c); + pqxx::work w2(*c2->c); - // using pqxx::stream_to would be a really nice alternative here, but - // unfortunately it doesn't support upserts. - // fprintf(stderr, "online notification tick\n"); - std::stringstream memberUpdate; - memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "; + fprintf(stderr, "online notification tick\n"); + bool firstRun = true; bool memberAdded = false; int updateCount = 0; + + pqxx::pipeline pipe(w); + for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { updateCount += 1; uint64_t nwid_i = i->first.first; @@ -1371,16 +1373,10 @@ void PostgreSQL::onlineNotification_Postgres() std::string networkId(nwidTmp); std::string memberId(memTmp); - const char *qvals[2] = { - networkId.c_str(), - memberId.c_str() - }; - try { - pqxx::row r = w.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", + pqxx::row r = w2.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId); } catch (pqxx::unexpected_rows &e) { - // fprintf(stderr, "Member count failed: %s\n", e.what()); continue; } @@ -1388,32 +1384,30 @@ void PostgreSQL::onlineNotification_Postgres() std::string ipAddr = i->second.second.toIpString(ipTmp); std::string timestamp = std::to_string(ts); - if (firstRun) { - firstRun = false; - } else { - memberUpdate << ", "; - } - - memberUpdate << "('" << networkId << "', '" << memberId << "', "; + std::stringstream memberUpdate; + memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES " + << "('" << networkId << "', '" << memberId << "', "; if (ipAddr.empty()) { memberUpdate << "NULL, "; } else { memberUpdate << "'" << ipAddr << "', "; } - memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; - memberAdded = true; - } - memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;"; + memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000)) " + << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated"; - if (memberAdded) { - //fprintf(stderr, "%s\n", memberUpdate.str().c_str()); - pqxx::result res = w.exec0(memberUpdate.str()); - w.commit(); + pipe.insert(memberUpdate.str()); } + while(!pipe.empty()) { + pipe.retrieve(); + } + + pipe.complete(); + w.commit(); fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount); } catch (std::exception &e) { fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); } + _pool->unborrow(c2); _pool->unborrow(c); ConnectionPoolStats stats = _pool->get_stats();