Queue up inserts in onlineNotificationThread() into a multiple insert statement

This commit is contained in:
Grant Limberg 2019-04-04 12:40:49 -07:00
parent 02f0eead1c
commit 55a9e6e05e

View file

@ -1298,6 +1298,10 @@ void PostgreSQL::onlineNotificationThread()
PGresult *res = NULL; PGresult *res = NULL;
std::stringstream memberUpdate;
memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ";
bool firstRun = true;
bool memberAdded = false;
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
uint64_t nwid_i = i->first.first; uint64_t nwid_i = i->first.first;
char nwidTmp[64]; char nwidTmp[64];
@ -1311,40 +1315,62 @@ void PostgreSQL::onlineNotificationThread()
continue; // skip members trying to join non-existant networks continue; // skip members trying to join non-existant networks
} }
lastOnlineCumulative[i->first] = i->second.first;
std::string networkId(nwidTmp);
std::string memberId(memTmp);
std::vector<std::string> &members = updateMap[networkId]; std::vector<std::string> &members = updateMap[networkId];
members.push_back(memberId); members.push_back(memberId);
lastOnlineCumulative[i->first] = i->second.first;
std::string networkId(nwidTmp);
std::string memberId(memTmp);
const char *qvals[2] = {
networkId.c_str(),
memberId.c_str()
};
res = PQexecParams(conn,
"SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2",
2,
NULL,
qvals,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
fprintf(stderr, "Member count failed: %s", PQerrorMessage(conn));
PQclear(res);
continue;
}
int nrows = PQntuples(res);
PQclear(res);
if (nrows == 1) {
int64_t ts = i->second.first; int64_t ts = i->second.first;
std::string ipAddr = i->second.second.toIpString(ipTmp); std::string ipAddr = i->second.second.toIpString(ipTmp);
std::string timestamp = std::to_string(ts); std::string timestamp = std::to_string(ts);
const char *values[4] = { if (firstRun) {
networkId.c_str(), firstRun = false;
memberId.c_str(), } else {
(ipAddr.empty() ? NULL : ipAddr.c_str()), memberUpdate << ", ";
timestamp.c_str(), }
};
res = PQexecParams(conn, memberUpdate << "('" << networkId << "', '" << memberId << '", ' << ipAddr << "', TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
"INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) " memberAdded = true;
"ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated", } else if (nrows > 1) {
4, // number of parameters fprintf(stderr, "nrows > 1?!?");
NULL, // oid field. ignore
values, // values for substitution
NULL, // lengths in bytes of each value
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res));
PQclear(res);
continue; continue;
} else {
continue;
}
}
memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;";
if (memberAdded) {
res = PQexec(conn, memberUpdate.str().c_str());
if (res != PGRES_COMMAND_OK) {
fprintf(stderr, "Multiple insert failed: %s", PQerrorMessage(conn));
} }
PQclear(res); PQclear(res);
} }
@ -1361,6 +1387,10 @@ void PostgreSQL::onlineNotificationThread()
} }
} }
std::stringstream networkUpdate;
networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES ";
bool nwFirstRun = true;
bool networkAdded = false;
for (auto i = networks.begin(); i != networks.end(); ++i) { for (auto i = networks.begin(); i != networks.end(); ++i) {
char tmp[64]; char tmp[64];
Utils::hex(i->first, tmp); Utils::hex(i->first, tmp);
@ -1392,36 +1422,60 @@ void PostgreSQL::onlineNotificationThread()
} }
} }
char *nvals[1] = {
networkId.c_str();
};
res = PQExecParams(conn,
"SELECT id FROM ztc_network WHERE id = $1",
1,
NULL,
nvals,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn));
PQclear(res);
continue;
}
int nrows = PQntuples(res);
PQclear(res);
if (nrows == 1) {
std::string bc = std::to_string(bridgeCount); std::string bc = std::to_string(bridgeCount);
std::string amc = std::to_string(authMemberCount); std::string amc = std::to_string(authMemberCount);
std::string omc = std::to_string(onlineMemberCount); std::string omc = std::to_string(onlineMemberCount);
std::string tmc = std::to_string(totalMemberCount); std::string tmc = std::to_string(totalMemberCount);
std::string timestamp = std::to_string(ts); std::string timestamp = std::to_string(ts);
const char *values[6] = {
networkId.c_str(),
bc.c_str(),
amc.c_str(),
omc.c_str(),
tmc.c_str(),
timestamp.c_str()
};
res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " if (nwFirstRun) {
"online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) " nwFirstRun = false;
"ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " } else {
"authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " networkUpdate << ", ";
"total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified", }
6,
NULL,
values,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) { networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", "
fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
PQclear(res);
networkAdded = true;
} else if (nrows > 1) {
fprintf(stderr, "Number of networks > 1?!?!?");
continue; continue;
} else {
continue;
}
}
networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
<< "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
<< "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified";
if (networkAdded) {
res = PQExec(conn, networkUpdate.str().c_str());
if (res != PGRES_COMMAND_OK) {
fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res));
} }
PQclear(res); PQclear(res);
} }
@ -1444,7 +1498,7 @@ void PostgreSQL::onlineNotificationThread()
// PQclear(res); // PQclear(res);
// } // }
std::this_thread::sleep_for(std::chrono::milliseconds(250)); std::this_thread::sleep_for(std::chrono::milliseconds(0));
} }
fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
PQfinish(conn); PQfinish(conn);