From 47154fa6234a7af69f209824c8869d8d07f0895c Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Wed, 2 Jun 2021 11:44:00 -0700 Subject: [PATCH] transiton to libpqxx & connection pool for central controllers --- controller/ConnectionPool.hpp | 161 ++++ controller/PostgreSQL.cpp | 1319 +++++++++------------------------ controller/PostgreSQL.hpp | 73 +- make-mac.mk | 4 +- 4 files changed, 571 insertions(+), 986 deletions(-) create mode 100644 controller/ConnectionPool.hpp diff --git a/controller/ConnectionPool.hpp b/controller/ConnectionPool.hpp new file mode 100644 index 000000000..8ffc1645c --- /dev/null +++ b/controller/ConnectionPool.hpp @@ -0,0 +1,161 @@ +/* + * Copyright (c)2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2025-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#ifndef ZT_CONNECTION_POOL_H_ +#define ZT_CONNECTION_POOL_H_ + + +#ifndef _DEBUG + #define _DEBUG(x) +#endif + +#include +#include +#include +#include +#include +#include + +namespace ZeroTier { + +struct ConnectionUnavailable : std::exception { + char const* what() const throw() { + return "Unable to allocate connection"; + }; +}; + + +class Connection { +public: + virtual ~Connection() {}; +}; + +class ConnectionFactory { +public: + virtual ~ConnectionFactory() {}; + virtual std::shared_ptr create()=0; +}; + +struct ConnectionPoolStats { + size_t pool_size; + size_t borrowed_size; +}; + +template +class ConnectionPool { +public: + ConnectionPool(size_t max_pool_size, size_t min_pool_size, std::shared_ptr factory) + : m_maxPoolSize(max_pool_size) + , m_minPoolSize(min_pool_size) + , m_factory(factory) + { + while(m_pool.size() < m_minPoolSize){ + m_pool.push_back(m_factory->create()); + } + }; + + ConnectionPoolStats get_stats() { + std::unique_lock lock(m_poolMutex); + + ConnectionPoolStats stats; + stats.pool_size = m_pool.size(); + stats.borrowed_size = m_borrowed.size(); + + return stats; + }; + + ~ConnectionPool() { + }; + + /** + * Borrow + * + * Borrow a connection for temporary use + * + * When done, either (a) call unborrow() to return it, or (b) (if it's bad) just let it go out of scope. This will cause it to automatically be replaced. + * @retval a shared_ptr to the connection object + */ + std::shared_ptr borrow() { + std::unique_lock l(m_poolMutex); + + while((m_pool.size() + m_borrowed.size()) < m_minPoolSize) { + std::shared_ptr conn = m_factory->create(); + m_pool.push_back(conn); + } + + if(m_pool.size()==0){ + + if ((m_pool.size() + m_borrowed.size()) <= m_maxPoolSize) { + try { + std::shared_ptr conn = m_factory->create(); + m_borrowed.insert(conn); + return std::static_pointer_cast(conn); + } catch (std::exception &e) { + throw ConnectionUnavailable(); + } + } else { + for(auto it = m_borrowed.begin(); it != m_borrowed.end(); ++it){ + if((*it).unique()) { + // This connection has been abandoned! Destroy it and create a new connection + try { + // If we are able to create a new connection, return it + _DEBUG("Creating new connection to replace discarded connection"); + std::shared_ptr conn = m_factory->create(); + m_borrowed.erase(it); + m_borrowed.insert(conn); + return std::static_pointer_cast(conn); + } catch(std::exception& e) { + // Error creating a replacement connection + throw ConnectionUnavailable(); + } + } + } + // Nothing available + throw ConnectionUnavailable(); + } + } + + // Take one off the front + std::shared_ptr conn = m_pool.front(); + m_pool.pop_front(); + // Add it to the borrowed list + m_borrowed.insert(conn); + return std::static_pointer_cast(conn); + }; + + /** + * Unborrow a connection + * + * Only call this if you are returning a working connection. If the connection was bad, just let it go out of scope (so the connection manager can replace it). + * @param the connection + */ + void unborrow(std::shared_ptr conn) { + // Lock + std::unique_lock lock(m_poolMutex); + m_borrowed.erase(conn); + if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) { + m_pool.push_back(conn); + } + }; +protected: + size_t m_maxPoolSize; + size_t m_minPoolSize; + std::shared_ptr m_factory; + std::deque > m_pool; + std::set > m_borrowed; + std::mutex m_poolMutex; +}; + +} + +#endif \ No newline at end of file diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 7897cf5e2..4687c9204 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -70,12 +70,50 @@ std::string join(const std::vector &elements, const char * const se using namespace ZeroTier; + +MemberNotificationReceiver::MemberNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel) + : pqxx::notification_receiver(c, channel) + , _psql(p) +{} + + +void MemberNotificationReceiver::operator() (const std::string &payload, int packend_pid) { + json tmp(json::parse(payload)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _psql->_memberChanged(oldConfig,newConfig,(_psql->_ready>=2)); + } +} + + +NetworkNotificationReceiver::NetworkNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel) + : pqxx::notification_receiver(c, channel) + , _psql(p) +{} + +void NetworkNotificationReceiver::operator() (const std::string &payload, int packend_pid) { + json tmp(json::parse(payload)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _psql->_networkChanged(oldConfig,newConfig,(_psql->_ready>=2)); + } +} + using Attrs = std::vector>; using Item = std::pair; using ItemStream = std::vector; PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc) : DB() + , _pool() , _myId(myId) , _myAddress(myId.address()) , _ready(0) @@ -90,7 +128,10 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R char myAddress[64]; _myAddressStr = myId.address().toString(myAddress); _connString = std::string(path) + " application_name=controller_" + _myAddressStr; - + auto f = std::make_shared(_connString); + _pool = std::make_shared >( + 15, 5, std::static_pointer_cast(f)); + memset(_ssoPsk, 0, sizeof(_ssoPsk)); char *const ssoPskHex = getenv("ZT_SSO_PSK"); if (ssoPskHex) { @@ -100,32 +141,18 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk)); } - // Database Schema Version Check - PGconn *conn = getPgConn(); - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); - exit(1); - } + auto c = _pool->borrow(); + pqxx::work txn{*c->c}; - PGresult *res = PQexec(conn, "SELECT version FROM ztc_database"); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, "Error determining database version"); - exit(1); - } - - if (PQntuples(res) != 1) { - fprintf(stderr, "Invalid number of db version tuples returned."); - exit(1); - } - - int dbVersion = std::stoi(PQgetvalue(res, 0, 0)); + pqxx::row r{txn.exec1("SELECT version FROM ztc_database")}; + int dbVersion = r[0].as(); + txn.commit(); if (dbVersion < DB_MINIMUM_VERSION) { fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION); exit(1); } - PQclear(res); - res = NULL; + _pool->unborrow(c); if (_rc != NULL) { sw::redis::ConnectionOptions opts; @@ -149,11 +176,8 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); _waitNoticePrinted = true; - initializeNetworks(conn); - initializeMembers(conn); - - PQfinish(conn); - conn = NULL; + initializeNetworks(); + initializeMembers(); _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this); _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this); @@ -283,74 +307,50 @@ void PostgreSQL::updateMemberOnLoad(const uint64_t networkId, const uint64_t mem bool have_auth = false; try { - PGconn *conn = getPgConn(); - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); - exit(1); - } + auto c = _pool->borrow(); + pqxx::work w(*c->c); - const char *params[1] = { nwids }; - PGresult *res = PQexecParams(conn, "SELECT org.client_id, org.authorization_endpoint " + pqxx::row r = w.exec_params1("SELECT org.client_id, org.authorization_endpoint " "FROM ztc_network AS nw, ztc_org AS org " - "WHERE nw.id = $1 AND nw.sso_enabled = true AND org.owner_id = nw.owner_id", - 1, - NULL, - params, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, "Org client_id and authorization_endpoint lookup failed: %s", PQerrorMessage(conn)); - PQclear(res); - exit(1); - } + "WHERE nw.id = $1 AND nw.sso_enabled = true AND org.owner_id = nw.owner_id", nwids); + + std::string client_id = r[0].as(); + std::string authorization_endpoint = r[1].as(); - if (PQntuples(res) >= 1) { - std::string client_id = PQgetvalue(res, 0, 0); - std::string authorization_endpoint = PQgetvalue(res, 0, 1); - PQclear(res); - if ((!client_id.empty())&&(!authorization_endpoint.empty())) { - const char *params2[2] = { nwids, ids }; - res = PQexecParams(conn, "SELECT e.nonce, e.authentication_expiry_time " - "FROM ztc_sso_expiry AS e " - "WHERE e.network_id = $1 AND e.member_id = $2 " - "ORDER BY n.authentication_expiry_time DESC LIMIT 1", - 1, - NULL, - params2, - NULL, - NULL, - 0); - if (PQntuples(res) >= 1) { - std::string nonce = PQgetvalue(res, 0, 0); - int64_t authentication_expiry_time = std::stoll(PQgetvalue(res, 0, 1)); - if ((authentication_expiry_time >= 0)&&(!nonce.empty())) { - have_auth = true; + if ((!client_id.empty())&&(!authorization_endpoint.empty())) { + pqxx::row r2 = w.exec_params1( + "SELECT e.nonce, EXTRACT(EPOCH FROM e.authentication_expiry_time AT TIME ZONE 'UTC')*1000 as authentication_expiry_time" + "FROM ztc_sso_expiry e " + "WHERE e.network_id = $1 AND e.member_id = $2 " + "ORDER BY n.authentication_expiry_time DESC LIMIT 1", nwids, ids); - uint8_t state[48]; - HMACSHA384(_ssoPsk, nonce.data(), (unsigned int)nonce.length(), state); - char state_hex[256]; - Utils::hex(state, 48, state_hex); - char authenticationURL[4096]; - const char *redirect_url = "redirect_uri=http%3A%2F%2Fmy.zerotier.com%2Fapi%2Fnetwork%2Fsso-auth"; // TODO: this should be configurable - OSUtils::ztsnprintf(authenticationURL, sizeof(authenticationURL), - "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redriect_uri=%s&nonce=%s&state=%s&client_id=%s", - authorization_endpoint.c_str(), - redirect_url, - nonce.c_str(), - state_hex, // NOTE: should these be URL escaped? Don't think there's a risk as they are not user definable. - client_id.c_str()); + std::string nonce = r2[0].as(); + int64_t authentication_expiry_time = r2[0].as(); + if ((authentication_expiry_time >= 0)&&(!nonce.empty())) { + have_auth = true; - member["authenticationExpiryTime"] = authentication_expiry_time; - member["authenticationURL"] = authenticationURL; - } - } - PQclear(res); + uint8_t state[48]; + HMACSHA384(_ssoPsk, nonce.data(), (unsigned int)nonce.length(), state); + char state_hex[256]; + Utils::hex(state, 48, state_hex); + char authenticationURL[4096]; + const char *redirect_url = "redirect_uri=http%3A%2F%2Fmy.zerotier.com%2Fapi%2Fnetwork%2Fsso-auth"; // TODO: this should be configurable + OSUtils::ztsnprintf(authenticationURL, sizeof(authenticationURL), + "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redriect_uri=%s&nonce=%s&state=%s&client_id=%s", + authorization_endpoint.c_str(), + redirect_url, + nonce.c_str(), + state_hex, // NOTE: should these be URL escaped? Don't think there's a risk as they are not user definable. + client_id.c_str()); + + member["authenticationExpiryTime"] = authentication_expiry_time; + member["authenticationURL"] = authenticationURL; } - } else { - PQclear(res); } + w.commit(); + _pool->unborrow(c); + } catch (sw::redis::Error &e) { fprintf(stderr, "ERROR: Error updating member on load, in Redis: %s\n", e.what()); exit(-1); @@ -360,162 +360,93 @@ void PostgreSQL::updateMemberOnLoad(const uint64_t networkId, const uint64_t mem } } -void PostgreSQL::initializeNetworks(PGconn *conn) +void PostgreSQL::initializeNetworks() { try { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); - exit(1); - } - std::string setKey = "networks:{" + _myAddressStr + "}"; - - // if (_rc != NULL) { - // try { - // if (_rc->clusterMode) { - // _cluster->del(setKey); - // } else { - // _redis->del(setKey); - // } - // } catch (sw::redis::Error &e) { - // // del can throw an error if the key doesn't exist - // // swallow it and move along - // } - // } std::unordered_set networkSet; - const char *params[1] = { - _myAddressStr.c_str() - }; - fprintf(stderr, "Initializing Networks...\n"); - - PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " - "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " + auto c = _pool->borrow(); + pqxx::work w{*c->c}; + pqxx::result r = w.exec_params("SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000 as creation_time, capabilities, " + "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000 AS last_modified, mtu, multicast_limit, name, private, remote_trace_level, " "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " - "WHERE deleted = false AND controller_id = $1", - 1, - NULL, - params, - NULL, - NULL, - 0); + "WHERE deleted = false AND controller_id = $1", _myAddressStr); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn)); - PQclear(res); - exit(1); - } - - int numRows = PQntuples(res); - for (int i = 0; i < numRows; ++i) { + for (auto row = r.begin(); row != r.end(); row++) { json empty; json config; - const char *nwidparam[1] = { - PQgetvalue(res, i, 0) - }; - std::string nwid = PQgetvalue(res, i, 0); + std::string nwid = row[0].as(); networkSet.insert(nwid); config["id"] = nwid; config["nwid"] = nwid; + try { - config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); + config["creationTime"] = row[1].as(); } catch (std::exception &e) { config["creationTime"] = 0ULL; - //fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1)); } - config["capabilities"] = json::parse(PQgetvalue(res, i, 2)); - config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0); + config["capabilities"] = row[2].as(); + config["enableBroadcast"] = row[3].as(); try { - config["lastModified"] = std::stoull(PQgetvalue(res, i, 4)); + config["lastModified"] = row[4].as(); } catch (std::exception &e) { config["lastModified"] = 0ULL; - //fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4)); } try { - config["mtu"] = std::stoi(PQgetvalue(res, i, 5)); + config["mtu"] = row[5].as(); } catch (std::exception &e) { config["mtu"] = 2800; } try { - config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6)); + config["multicastLimit"] = row[6].as(); } catch (std::exception &e) { config["multicastLimit"] = 64; } - config["name"] = PQgetvalue(res, i, 7); - config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0); - try { - config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); - } catch (std::exception &e) { + config["name"] = row[7].as(); + config["private"] = row[8].as(); + if (!row[9].is_null()) { + config["remoteTraceLevel"] = row[9].as(); + } else { config["remoteTraceLevel"] = 0; } - config["remoteTraceTarget"] = PQgetvalue(res, i, 10); + config["remoteTraceTarget"] = row[10].as(); try { - config["revision"] = std::stoull(PQgetvalue(res, i, 11)); + config["revision"] = row[11].as(); } catch (std::exception &e) { config["revision"] = 0ULL; //fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11)); } - config["rules"] = json::parse(PQgetvalue(res, i, 12)); - config["tags"] = json::parse(PQgetvalue(res, i, 13)); - config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14)); - config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15)); + config["rules"] = json::parse(row[12].as()); + config["tags"] = json::parse(row[13].as()); + config["v4AssignMode"] = json::parse(row[14].as()); + config["v6AssignMode"] = json::parse(row[15].as()); config["objtype"] = "network"; config["ipAssignmentPools"] = json::array(); config["routes"] = json::array(); - PGresult *r2 = PQexecParams(conn, - "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1", - 1, - NULL, - nwidparam, - NULL, - NULL, - 0); - if (PQresultStatus(r2) != PGRES_TUPLES_OK) { - fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2)); - PQclear(r2); - PQclear(res); - exit(1); - } - - int n = PQntuples(r2); - for (int j = 0; j < n; ++j) { + pqxx::result r2 = w.exec_params("SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1", _myAddressStr); + + for (auto row2 = r2.begin(); row2 != r2.end(); row2++) { json ip; - ip["ipRangeStart"] = PQgetvalue(r2, j, 0); - ip["ipRangeEnd"] = PQgetvalue(r2, j, 1); + ip["ipRangeStart"] = row2[0].as(); + ip["ipRangeEnd"] = row2[1].as(); config["ipAssignmentPools"].push_back(ip); } - PQclear(r2); + r2 = w.exec_params("SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1", _myAddressStr); - r2 = PQexecParams(conn, - "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1", - 1, - NULL, - nwidparam, - NULL, - NULL, - 0); - - if (PQresultStatus(r2) != PGRES_TUPLES_OK) { - fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2)); - PQclear(r2); - PQclear(res); - exit(1); - } - - n = PQntuples(r2); - for (int j = 0; j < n; ++j) { - std::string addr = PQgetvalue(r2, j, 0); - std::string bits = PQgetvalue(r2, j, 1); - std::string via = PQgetvalue(r2, j, 2); + for (auto row2 = r2.begin(); row2 != r2.end(); row2++) { + std::string addr = row2[0].as(); + std::string bits = row2[1].as(); + std::string via = row2[2].as(); json route; route["target"] = addr + "/" + bits; @@ -527,29 +458,15 @@ void PostgreSQL::initializeNetworks(PGconn *conn) config["routes"].push_back(route); } - r2 = PQexecParams(conn, - "SELECT domain, servers FROM ztc_network_dns WHERE network_id = $1", - 1, - NULL, - nwidparam, - NULL, - NULL, - 0); + r2 = w.exec_params("SELECT domain, servers FROM ztc_network_dns WHERE network_id = $1", _myAddressStr); - if (PQresultStatus(r2) != PGRES_TUPLES_OK) { - fprintf(stderr, "ERROR: Error retrieving DNS settings for network: %s\n", PQresultErrorMessage(r2)); - PQclear(r2); - PQclear(res); - exit(1); - } - - n = PQntuples(r2); - if (n > 1) { + if (r2.size() > 1) { fprintf(stderr, "ERROR: invalid number of DNS configurations for network %s. Must be 0 or 1\n", nwid.c_str()); - } else if (n == 1) { + } else if (r2.size() == 1) { + auto dnsRow = r2.begin(); json obj; - std::string domain = PQgetvalue(r2, 0, 0); - std::string serverList = PQgetvalue(r2, 0, 1); + std::string domain = dnsRow[0].as(); + std::string serverList = dnsRow[1].as(); auto servers = json::array(); if (serverList.rfind("{",0) != std::string::npos) { serverList = serverList.substr(1, serverList.size()-2); @@ -565,24 +482,11 @@ void PostgreSQL::initializeNetworks(PGconn *conn) config["dns"] = obj; } - PQclear(r2); - _networkChanged(empty, config, false); } - PQclear(res); - - // if(!networkSet.empty()) { - // if (_rc && _rc->clusterMode) { - // auto tx = _cluster->transaction(_myAddressStr, true); - // tx.sadd(setKey, networkSet.begin(), networkSet.end()); - // tx.exec(); - // } else if (_rc && !_rc->clusterMode) { - // auto tx = _redis->transaction(true); - // tx.sadd(setKey, networkSet.begin(), networkSet.end()); - // tx.exec(); - // } - // } + w.commit(); + _pool->unborrow(c); if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -599,52 +503,15 @@ void PostgreSQL::initializeNetworks(PGconn *conn) } } -void PostgreSQL::initializeMembers(PGconn *conn) +void PostgreSQL::initializeMembers() { try { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); - exit(1); - } - // std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; - - // if (_rc != NULL) { - // std::lock_guard l(_networks_l); - // std::unordered_set deletes; - // for ( auto it : _networks) { - // uint64_t nwid_i = it.first; - // char nwidTmp[64] = {0}; - // OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - // std::string nwid(nwidTmp); - // std::string key = setKeyBase + nwid; - // deletes.insert(key); - // } - - // if (!deletes.empty()) { - // if (_rc->clusterMode) { - // auto tx = _cluster->transaction(_myAddressStr, true); - // for (std::string k : deletes) { - // tx.del(k); - // } - // tx.exec(); - // } else { - // auto tx = _redis->transaction(true); - // for (std::string k : deletes) { - // tx.del(k); - // } - // tx.exec(); - // } - // } - // } - - const char *params[1] = { - _myAddressStr.c_str() - }; - std::unordered_map networkMembers; - + fprintf(stderr, "Initializing Members...\n"); - PGresult *res = PQexecParams(conn, + auto c = _pool->borrow(); + pqxx::work w{*c->c}; + pqxx::result r = w.exec( "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, " @@ -653,123 +520,90 @@ void PostgreSQL::initializeMembers(PGconn *conn) "FROM ztc_member m " "INNER JOIN ztc_network n " " ON n.id = m.network_id " - "WHERE n.controller_id = $1 AND m.deleted = false", - 1, - NULL, - params, - NULL, - NULL, - 0); + "WHERE n.controller_id = $1 AND m.deleted = false"); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); - PQclear(res); - exit(1); - } - - int numRows = PQntuples(res); - for (int i = 0; i < numRows; ++i) { + for (auto row = r.begin(); row != r.end(); row++) { json empty; json config; - std::string memberId(PQgetvalue(res, i, 0)); - std::string networkId(PQgetvalue(res, i, 1)); + std::string memberId = row[0].as(); + std::string networkId = row[1].as(); - // networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); - - std::string ctime = PQgetvalue(res, i, 5); config["id"] = memberId; config["nwid"] = networkId; - config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0); - config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0); - try { - config["capabilities"] = json::parse(PQgetvalue(res, i, 4)); - } catch (std::exception &e) { + config["activeBridge"] = row[2].as(); + config["authorized"] = row[3].as(); + if (row[4].is_null()) { config["capabilities"] = json::array(); + } else { + try { + config["capabilities"] = json::parse(row[4].as()); + } catch (std::exception &e) { + config["capabilities"] = json::array(); + } } + config["creationTime"] = row[5].as(); + config["identity"] = row[6].as(); try { - config["creationTime"] = std::stoull(PQgetvalue(res, i, 5)); - } catch (std::exception &e) { - config["creationTime"] = 0ULL; - //fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5)); - } - config["identity"] = PQgetvalue(res, i, 6); - try { - config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7)); + config["lastAuthorizedTime"] = row[7].as(); } catch(std::exception &e) { config["lastAuthorizedTime"] = 0ULL; //fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7)); } try { - config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8)); + config["lastDeauthorizedTime"] = row[8].as(); } catch( std::exception &e) { config["lastDeauthorizedTime"] = 0ULL; //fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8)); } try { - config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); + config["remoteTraceLevel"] = row[9].as(); } catch (std::exception &e) { config["remoteTraceLevel"] = 0; } - config["remoteTraceTarget"] = PQgetvalue(res, i, 10); + config["remoteTraceTarget"] = row[10].as(); try { - config["tags"] = json::parse(PQgetvalue(res, i, 11)); + config["tags"] = json::parse(row[11].as()); } catch (std::exception &e) { config["tags"] = json::array(); } try { - config["vMajor"] = std::stoi(PQgetvalue(res, i, 12)); + config["vMajor"] = row[12].as(); } catch(std::exception &e) { config["vMajor"] = -1; } try { - config["vMinor"] = std::stoi(PQgetvalue(res, i, 13)); + config["vMinor"] = row[13].as(); } catch (std::exception &e) { config["vMinor"] = -1; } try { - config["vRev"] = std::stoi(PQgetvalue(res, i, 14)); + config["vRev"] = row[14].as(); } catch (std::exception &e) { config["vRev"] = -1; } try { - config["vProto"] = std::stoi(PQgetvalue(res, i, 15)); + config["vProto"] = row[15].as(); } catch (std::exception &e) { config["vProto"] = -1; } - config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0); + config["noAutoAssignIps"] = row[16].as(); try { - config["revision"] = std::stoull(PQgetvalue(res, i, 17)); + config["revision"] = row[17].as(); } catch (std::exception &e) { config["revision"] = 0ULL; //fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17)); } config["objtype"] = "member"; config["ipAssignments"] = json::array(); - const char *p2[2] = { - memberId.c_str(), - networkId.c_str() - }; - PGresult *r2 = PQexecParams(conn, - "SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", - 2, - NULL, - p2, - NULL, - NULL, - 0); + pqxx::result r2 = w.exec("SELECT DISTINCT address " + "FROM ztc_member_ip_assignment " + "WHERE member_id = "+w.quote(memberId)+" AND network_id = "+w.quote(networkId)); + - if (PQresultStatus(r2) != PGRES_TUPLES_OK) { - fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); - PQclear(r2); - PQclear(res); - exit(1); - } - - int n = PQntuples(r2); - for (int j = 0; j < n; ++j) { - std::string ipaddr = PQgetvalue(r2, j, 0); + for (auto row2 = r2.begin(); row2 != r2.end(); row2++) { + std::string ipaddr = row2[0].as(); std::size_t pos = ipaddr.find('/'); if (pos != std::string::npos) { ipaddr = ipaddr.substr(0, pos); @@ -780,25 +614,9 @@ void PostgreSQL::initializeMembers(PGconn *conn) _memberChanged(empty, config, false); } - PQclear(res); + w.commit(); + _pool->unborrow(c); - // if (!networkMembers.empty()) { - // if (_rc != NULL) { - // if (_rc->clusterMode) { - // auto tx = _cluster->transaction(_myAddressStr, true); - // for (auto it : networkMembers) { - // tx.sadd(it.first, it.second); - // } - // tx.exec(); - // } else { - // auto tx = _redis->transaction(true); - // for (auto it : networkMembers) { - // tx.sadd(it.first, it.second); - // } - // tx.exec(); - // } - // } - // } if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); @@ -832,20 +650,13 @@ void PostgreSQL::heartbeat() const char *publicIdentity = publicId; const char *hostname = hostnameTmp; - PGconn *conn = getPgConn(); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); - exit(1); - } while (_run == 1) { - if(PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "%s heartbeat thread lost connection to Database\n", _myAddressStr.c_str()); - PQfinish(conn); - exit(6); - } + auto c = _pool->borrow(); int64_t ts = OSUtils::now(); - if (conn) { + + if(c->c) { + pqxx::work w{*c->c}; + std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR); std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR); std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION); @@ -853,71 +664,37 @@ void PostgreSQL::heartbeat() std::string now = std::to_string(ts); std::string host_port = std::to_string(_listenPort); std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false"; - const char *values[10] = { - controllerId, - hostname, - now.c_str(), - publicIdentity, - major.c_str(), - minor.c_str(), - rev.c_str(), - build.c_str(), - host_port.c_str(), - use_redis.c_str() - }; - - PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " + + try { + pqxx::result res = w.exec0("INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) " + "VALUES ("+w.quote(controllerId)+", "+w.quote(hostname)+", TO_TIMESTAMP("+now+"::double precision/1000), "+ + w.quote(publicIdentity)+", "+major+", "+minor+", "+rev+", "+build+", "+host_port+", "+use_redis+") " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " - "use_redis = EXCLUDED.use_redis", - 10, // number of parameters - NULL, // oid field. ignore - values, // values for substitution - NULL, // lengths in bytes of each value - NULL, // binary? - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res)); - } - PQclear(res); + "use_redis = EXCLUDED.use_redis"); + } catch (std::exception &e) { + fprintf(stderr, "Heartbeat update failed: %s\n", e.what()); + w.abort(); + _pool->unborrow(c); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + continue; + } + w.commit(); } - // if (_rc != NULL) { - // if (_rc->clusterMode) { - // _cluster->zadd("controllers", controllerId, ts); - // } else { - // _redis->zadd("controllers", controllerId, ts); - // } - // } + _pool->unborrow(c); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } - - PQfinish(conn); - conn = NULL; fprintf(stderr, "Exited heartbeat thread\n"); } void PostgreSQL::membersDbWatcher() { - PGconn *conn = getPgConn(NO_OVERRIDE); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); - exit(1); - } - if (_rc) { - PQfinish(conn); - conn = NULL; _membersWatcher_Redis(); } else { - _membersWatcher_Postgres(conn); - PQfinish(conn); - conn = NULL; + _membersWatcher_Postgres(); } if (_run == 1) { @@ -927,46 +704,19 @@ void PostgreSQL::membersDbWatcher() fprintf(stderr, "Exited membersDbWatcher\n"); } -void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { - char buf[11] = {0}; - std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); - fprintf(stderr, "Listening to member stream: %s\n", cmd.c_str()); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } +void PostgreSQL::_membersWatcher_Postgres() { + auto c = _pool->borrow(); - PQclear(res); res = NULL; + std::string stream = "member_" + _myAddressStr; + + fprintf(stderr, "Listening to member stream: %s\n", stream.c_str()); + MemberNotificationReceiver m(this, *c->c, stream); while(_run == 1) { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); - exit(-1); - } - PGnotify *notify = NULL; - PQconsumeInput(conn); - while ((notify = PQnotifies(conn)) != NULL) { - //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); - - try { - json tmp(json::parse(notify->extra)); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object() || newConfig.is_object()) { - _memberChanged(oldConfig,newConfig,(this->_ready>=2)); - } - } catch (...) {} // ignore bad records - - free(notify); - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + c->c->await_notification(5, 0); } + + _pool->unborrow(c); } void PostgreSQL::_membersWatcher_Redis() { @@ -1029,21 +779,10 @@ void PostgreSQL::_membersWatcher_Redis() { void PostgreSQL::networksDbWatcher() { - PGconn *conn = getPgConn(NO_OVERRIDE); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); - exit(1); - } - if (_rc) { - PQfinish(conn); - conn = NULL; _networksWatcher_Redis(); } else { - _networksWatcher_Postgres(conn); - PQfinish(conn); - conn = NULL; + _networksWatcher_Postgres(); } if (_run == 1) { @@ -1053,42 +792,15 @@ void PostgreSQL::networksDbWatcher() fprintf(stderr, "Exited networksDbWatcher\n"); } -void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { - char buf[11] = {0}; - std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } +void PostgreSQL::_networksWatcher_Postgres() { + std::string stream = "network_" + _myAddressStr; - PQclear(res); res = NULL; + auto c = _pool->borrow(); + + NetworkNotificationReceiver n(this, *c->c, stream); while(_run == 1) { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); - exit(-1); - } - PGnotify *notify = NULL; - PQconsumeInput(conn); - while ((notify = PQnotifies(conn)) != NULL) { - //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); - try { - json tmp(json::parse(notify->extra)); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object()||newConfig.is_object()) { - _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); - } - } catch (...) {} // ignore bad records - free(notify); - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + c->c->await_notification(5,0); } } @@ -1153,78 +865,32 @@ void PostgreSQL::_networksWatcher_Redis() { void PostgreSQL::commitThread() { - PGconn *conn = getPgConn(); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); - exit(1); - } - std::pair qitem; while(_commitQueue.get(qitem)&(_run == 1)) { if (!qitem.first.is_object()) { continue; } - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); - exit(1); - } + + + + try { nlohmann::json *config = &(qitem.first); const std::string objtype = (*config)["objtype"]; if (objtype == "member") { try { + auto c = _pool->borrow(); + pqxx::work w(*c->c); + std::string memberId = (*config)["id"]; std::string networkId = (*config)["nwid"]; - std::string identity = (*config)["identity"]; std::string target = "NULL"; - if (!(*config)["remoteTraceTarget"].is_null()) { target = (*config)["remoteTraceTarget"]; } - - std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1); - std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]); - std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]); - std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); - std::string rev = std::to_string((unsigned long long)(*config)["revision"]); - std::string tags = OSUtils::jsonDump((*config)["tags"], -1); - std::string vmajor = std::to_string((int)(*config)["vMajor"]); - std::string vminor = std::to_string((int)(*config)["vMinor"]); - std::string vrev = std::to_string((int)(*config)["vRev"]); - std::string vproto = std::to_string((int)(*config)["vProto"]); - const char *values[19] = { - memberId.c_str(), - networkId.c_str(), - ((*config)["activeBridge"] ? "true" : "false"), - ((*config)["authorized"] ? "true" : "false"), - caps.c_str(), - identity.c_str(), - lastAuthTime.c_str(), - lastDeauthTime.c_str(), - ((*config)["noAutoAssignIps"] ? "true" : "false"), - rtraceLevel.c_str(), - (target == "NULL") ? NULL : target.c_str(), - rev.c_str(), - tags.c_str(), - vmajor.c_str(), - vminor.c_str(), - vrev.c_str(), - vproto.c_str() - }; - - PGresult *res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error beginning update transaction: %s\n", PQresultErrorMessage(res)); - PQclear(res); - delete config; - config = nullptr; - continue; - } - - res = PQexecParams(conn, + + pqxx::result res = w.exec_params0( "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " @@ -1237,49 +903,27 @@ void PostgreSQL::commitThread() "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto", - 17, - NULL, - values, - NULL, - NULL, - 0); + memberId, + networkId, + (bool)(*config)["activeBridge"], + (bool)(*config)["authorized"], + OSUtils::jsonDump((*config)["capabilities"], -1), + (std::string)(*config)["identity"], + (uint64_t)(*config)["lastAuthorizedTime"], + (uint64_t)(*config)["lastDeauthorizedTime"], + (bool)(*config)["noAutoAssignIps"], + (int)(*config)["remoteTraceLevel"], + target, + (uint64_t)(*config)["revision"], + OSUtils::jsonDump((*config)["tags"], -1), + (int)(*config)["vMajor"], + (int)(*config)["vMinor"], + (int)(*config)["vRev"], + (int)(*config)["vProto"]); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res)); - fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str()); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } - PQclear(res); - - const char *v2[2] = { - memberId.c_str(), - networkId.c_str() - }; - - res = PQexecParams(conn, - "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", - 2, - NULL, - v2, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK"));; - delete config; - config = nullptr; - continue; - } - - PQclear(res); + res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", + memberId, networkId); std::vector assignments; bool ipAssignError = false; @@ -1290,29 +934,10 @@ void PostgreSQL::commitThread() continue; } - const char *v3[3] = { - memberId.c_str(), - networkId.c_str(), - addr.c_str() - }; - - res = PQexecParams(conn, + res = w.exec_params0( "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING", - 3, - NULL, - v3, - NULL, - NULL, - 0); + memberId, networkId, addr); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - ipAssignError = true; - break; - } - PQclear(res); assignments.push_back(addr); } if (ipAssignError) { @@ -1321,15 +946,8 @@ void PostgreSQL::commitThread() continue; } - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error committing member transaction: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } + w.commit(); + _pool->unborrow(c); const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); @@ -1351,58 +969,18 @@ void PostgreSQL::commitThread() } } else if (objtype == "network") { try { + auto c = _pool->borrow(); + pqxx::work w(*c->c); + std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string name = (*config)["name"]; - std::string remoteTraceTarget("NULL"); - if (!(*config)["remoteTraceTarget"].is_null()) { + std::string remoteTraceTarget = ""; + if(!(*config)["remoteTraceTarget"].is_null()) { remoteTraceTarget = (*config)["remoteTraceTarget"]; } - std::string rulesSource; + std::string rulesSource = ""; if ((*config)["rulesSource"].is_string()) { rulesSource = (*config)["rulesSource"]; } - std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); - std::string now = std::to_string(OSUtils::now()); - std::string mtu = std::to_string((int)(*config)["mtu"]); - std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]); - std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); - std::string rules = OSUtils::jsonDump((*config)["rules"], -1); - std::string tags = OSUtils::jsonDump((*config)["tags"], -1); - std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); - std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); - bool enableBroadcast = (*config)["enableBroadcast"]; - bool isPrivate = (*config)["private"]; - - const char *values[16] = { - id.c_str(), - controllerId.c_str(), - caps.c_str(), - enableBroadcast ? "true" : "false", - now.c_str(), - mtu.c_str(), - mcastLimit.c_str(), - name.c_str(), - isPrivate ? "true" : "false", - rtraceLevel.c_str(), - (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()), - rules.c_str(), - rulesSource.c_str(), - tags.c_str(), - v4mode.c_str(), - v6mode.c_str(), - }; - - PGresult *res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res)); - PQclear(res); - delete config; - config = nullptr; - continue; - } - - PQclear(res); // This ugly query exists because when we want to mirror networks to/from // another data store (e.g. FileDB or LFDB) it is possible to get a network @@ -1410,7 +988,7 @@ void PostgreSQL::commitThread() // the owner_id to the "first" global admin in the user DB if the record // did not previously exist. If the record already exists owner_id is left // unchanged, so owner_id should be left out of the update clause. - res = PQexecParams(conn, + pqxx::result res = w.exec_params0( "INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, " "last_modified, mtu, multicast_limit, name, private, " "remote_trace_level, remote_trace_target, rules, rules_source, " @@ -1427,100 +1005,37 @@ void PostgreSQL::commitThread() "remote_trace_target = EXCLUDED.remote_trace_target, rules = EXCLUDED.rules, " "rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, " "v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode", - 16, - NULL, - values, - NULL, - NULL, - 0); + id, + _myAddressStr, + OSUtils::jsonDump((*config)["capabilitles"], -1), + (bool)(*config)["enableBroadcast"], + OSUtils::now(), + (int)(*config)["mtu"], + (int)(*config)["multicastLimit"], + (std::string)(*config)["name"], + (bool)(*config)["private"], + (int)(*config)["remoteTraceLevel"], + remoteTraceTarget, + OSUtils::jsonDump((*config)["rules"], -1), + rulesSource, + OSUtils::jsonDump((*config)["tags"], -1), + OSUtils::jsonDump((*config)["v4AssignMode"],-1), + OSUtils::jsonDump((*config)["v6AssignMode"], -1)); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } - - PQclear(res); - - const char *params[1] = { - id.c_str() - }; - res = PQexecParams(conn, - "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", - 1, - NULL, - params, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } - - PQclear(res); + res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0); auto pool = (*config)["ipAssignmentPools"]; bool err = false; for (auto i = pool.begin(); i != pool.end(); ++i) { std::string start = (*i)["ipRangeStart"]; std::string end = (*i)["ipRangeEnd"]; - const char *p[3] = { - id.c_str(), - start.c_str(), - end.c_str() - }; - res = PQexecParams(conn, + res = w.exec_params0( "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " - "VALUES ($1, $2, $3)", - 3, - NULL, - p, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); - PQclear(res); - err = true; - break; - } - PQclear(res); - } - if (err) { - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } - - res = PQexecParams(conn, - "DELETE FROM ztc_network_route WHERE network_id = $1", - 1, - NULL, - params, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; + "VALUES ($1, $2, $3)", id, start, end); } + res = w.exec_params0("DELETE FROM ztc_network_route WHERE network_id = $1", id); auto routes = (*config)["routes"]; err = false; @@ -1542,36 +1057,17 @@ void PostgreSQL::commitThread() via = (*i)["via"]; } - const char *p[4] = { - id.c_str(), - targetAddr.c_str(), - targetBits.c_str(), - (via == "NULL" ? NULL : via.c_str()), - }; - - res = PQexecParams(conn, - "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", - 4, - NULL, - p, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); - PQclear(res); - err = true; - break; - } - PQclear(res); + res = w.exec_params0("INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", + id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str())); } if (err) { - PQclear(PQexec(conn, "ROLLBACK")); + w.abort(); + _pool->unborrow(c); delete config; config = nullptr; continue; } + auto dns = (*config)["dns"]; std::string domain = dns["domain"]; std::stringstream servers; @@ -1584,38 +1080,13 @@ void PostgreSQL::commitThread() } servers << "}"; - const char *p[3] = { - id.c_str(), - domain.c_str(), - servers.str().c_str() - }; + std::string s = servers.str(); - res = PQexecParams(conn, "INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers", - 3, - NULL, - p, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating DNS: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - err = true; - break; - } - PQclear(res); + res = w.exec_params0("INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers", + id, domain, s); - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } - PQclear(res); + w.commit(); + _pool->unborrow(c); const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); if (nwidInt) { @@ -1632,201 +1103,116 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } - // if (_rc != NULL) { - // try { - // std::string id = (*config)["id"]; - // std::string controllerId = _myAddressStr.c_str(); - // std::string key = "networks:{" + controllerId + "}"; - // if (_rc->clusterMode) { - // _cluster->sadd(key, id); - // } else { - // _redis->sadd(key, id); - // } - // } catch (sw::redis::Error &e) { - // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - // } - // } } else if (objtype == "_delete_network") { try { + auto c = _pool->borrow(); + pqxx::work w(*c->c); + std::string networkId = (*config)["nwid"]; - const char *values[1] = { - networkId.c_str() - }; - PGresult * res = PQexecParams(conn, - "UPDATE ztc_network SET deleted = true WHERE id = $1", - 1, - NULL, - values, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res)); - } + pqxx::result res = w.exec_params0("UPDATE ztc_network SET deleted = true WHERE id = $1", + networkId); - PQclear(res); + w.commit(); + _pool->unborrow(c); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } - // if (_rc != NULL) { - // try { - // std::string id = (*config)["id"]; - // std::string controllerId = _myAddressStr.c_str(); - // std::string key = "networks:{" + controllerId + "}"; - // if (_rc->clusterMode) { - // _cluster->srem(key, id); - // _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); - // } else { - // _redis->srem(key, id); - // _redis->del("network-nodes-online:{"+controllerId+"}:"+id); - // } - // } catch (sw::redis::Error &e) { - // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - // } - // } + } else if (objtype == "_delete_member") { try { + auto c = _pool->borrow(); + pqxx::work w(*c->c); + std::string memberId = (*config)["id"]; std::string networkId = (*config)["nwid"]; - const char *values[2] = { - memberId.c_str(), - networkId.c_str() - }; - - PGresult *res = PQexecParams(conn, + pqxx::result res = w.exec_params0( "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", - 2, - NULL, - values, - NULL, - NULL, - 0); + memberId, networkId); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res)); - } - - PQclear(res); + w.commit(); + _pool->unborrow(c); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } - // if (_rc != NULL) { - // try { - // std::string memberId = (*config)["id"]; - // std::string networkId = (*config)["nwid"]; - // std::string controllerId = _myAddressStr.c_str(); - // std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; - // if (_rc->clusterMode) { - // _cluster->srem(key, memberId); - // _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - // } else { - // _redis->srem(key, memberId); - // _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - // } - // } catch (sw::redis::Error &e) { - // fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); - // } - // } } else { fprintf(stderr, "ERROR: unknown objtype"); } } catch (std::exception &e) { fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - PQfinish(conn); - if (_run == 1) { - fprintf(stderr, "ERROR: %s commitThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(7); - } fprintf(stderr, "commitThread finished\n"); } void PostgreSQL::onlineNotificationThread() { waitForReady(); - - // if (_rc != NULL) { - // onlineNotification_Redis(); - // } else { - onlineNotification_Postgres(); - // } + onlineNotification_Postgres(); } void PostgreSQL::onlineNotification_Postgres() { - PGconn *conn = getPgConn(); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); - exit(1); + try { + auto c = _pool->borrow(); + _pool->unborrow(c); + } catch(std::exception &e) { + fprintf(stderr, "error getting connection in onlineNotification thread\n"); + exit(5); } _connected = 1; nlohmann::json jtmp1, jtmp2; while (_run == 1) { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); - PQfinish(conn); - exit(5); - } - - std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; - { - std::lock_guard l(_lastOnline_l); - lastOnline.swap(_lastOnline); - } - - PGresult *res = NULL; - - std::stringstream memberUpdate; - memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "; - bool firstRun = true; - bool memberAdded = false; - for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { - uint64_t nwid_i = i->first.first; - char nwidTmp[64]; - char memTmp[64]; - char ipTmp[64]; - OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); - OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - - if(!get(nwid_i, jtmp1, i->first.second, jtmp2)) { - continue; // skip non existent networks/members + try { + std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; + { + std::lock_guard l(_lastOnline_l); + lastOnline.swap(_lastOnline); } - std::string networkId(nwidTmp); - std::string memberId(memTmp); + auto c = _pool->borrow(); + pqxx::work w(*c->c); - const char *qvals[2] = { - networkId.c_str(), - memberId.c_str() - }; + // using pqxx::stream_to would be a really nice alternative here, but + // unfortunately it doesn't support upserts. - res = PQexecParams(conn, - "SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", - 2, - NULL, - qvals, - NULL, - NULL, - 0); + std::stringstream memberUpdate; + memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "; + bool firstRun = true; + bool memberAdded = false; + for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { + uint64_t nwid_i = i->first.first; + char nwidTmp[64]; + char memTmp[64]; + char ipTmp[64]; + OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); + OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, "Member count failed: %s", PQerrorMessage(conn)); - PQclear(res); - continue; - } + if(!get(nwid_i, jtmp1, i->first.second, jtmp2)) { + continue; // skip non existent networks/members + } - int nrows = PQntuples(res); - PQclear(res); + std::string networkId(nwidTmp); + std::string memberId(memTmp); + + const char *qvals[2] = { + networkId.c_str(), + memberId.c_str() + }; + + try { + pqxx::row r = w.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", + networkId, memberId); + + } catch (pqxx::unexpected_rows &e) { + fprintf(stderr, "Member count failed: %s\n", e.what()); + continue; + } - if (nrows == 1) { int64_t ts = i->second.first; std::string ipAddr = i->second.second.toIpString(ipTmp); std::string timestamp = std::to_string(ts); @@ -1845,27 +1231,19 @@ void PostgreSQL::onlineNotification_Postgres() } memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; memberAdded = true; - } else if (nrows > 1) { - fprintf(stderr, "nrows > 1?!?"); - continue; - } else { - continue; } - } - memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;"; + memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;"; - if (memberAdded) { - res = PQexec(conn, memberUpdate.str().c_str()); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Multiple insert failed: %s", PQerrorMessage(conn)); + if (memberAdded) { + pqxx::result res = w.exec0(memberUpdate.str()); } - PQclear(res); + } catch (std::exception &e) { + fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); } std::this_thread::sleep_for(std::chrono::seconds(10)); } fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); - PQfinish(conn); if (_run == 1) { fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); exit(6); @@ -1961,20 +1339,5 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control tx.exec(); } -PGconn *PostgreSQL::getPgConn(OverrideMode m) -{ - if (m == ALLOW_PGBOUNCER_OVERRIDE) { - char *connStr = getenv("PGBOUNCER_CONNSTR"); - if (connStr != NULL) { - fprintf(stderr, "PGBouncer Override\n"); - std::string conn(connStr); - conn += " application_name=controller-"; - conn += _myAddressStr.c_str(); - return PQconnectdb(conn.c_str()); - } - } - - return PQconnectdb(_connString.c_str()); -} #endif //ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 1870b5afb..689a29bbc 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -20,6 +20,9 @@ #define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4 +#include "ConnectionPool.hpp" +#include + #include #include @@ -31,14 +34,65 @@ namespace ZeroTier { struct RedisConfig; + +class PostgresConnection : public Connection { +public: + virtual ~PostgresConnection() { + } + + std::shared_ptr c; + int a; +}; + + +class PostgresConnFactory : public ConnectionFactory { +public: + PostgresConnFactory(std::string &connString) + : m_connString(connString) + { + } + + virtual std::shared_ptr create() { + auto c = std::shared_ptr(new PostgresConnection()); + c->c = std::make_shared(m_connString); + return std::static_pointer_cast(c); + } +private: + std::string m_connString; +}; + +class PostgreSQL; + +class MemberNotificationReceiver : public pqxx::notification_receiver { +public: + MemberNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel); + virtual ~MemberNotificationReceiver() {} + + virtual void operator() (const std::string &payload, int backendPid); +private: + PostgreSQL *_psql; +}; + +class NetworkNotificationReceiver : public pqxx::notification_receiver { +public: + NetworkNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel); + virtual ~NetworkNotificationReceiver() {}; + + virtual void operator() (const std::string &payload, int packend_pid); +private: + PostgreSQL *_psql; +}; + /** * A controller database driver that talks to PostgreSQL * * This is for use with ZeroTier Central. Others are free to build and use it - * but be aware taht we might change it at any time. + * but be aware that we might change it at any time. */ class PostgreSQL : public DB { + friend class MemberNotificationReceiver; + friend class NetworkNotificationReceiver; public: PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc); virtual ~PostgreSQL(); @@ -56,15 +110,22 @@ protected: { inline std::size_t operator()(const std::pair &p) const { return (std::size_t)(p.first ^ p.second); } }; + void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners) { + DB::_memberChanged(old, memberConfig, notifyListeners); + } + + void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners) { + DB::_memberChanged(old, networkConfig, notifyListeners); + } private: - void initializeNetworks(PGconn *conn); - void initializeMembers(PGconn *conn); + void initializeNetworks(); + void initializeMembers(); void heartbeat(); void membersDbWatcher(); - void _membersWatcher_Postgres(PGconn *conn); + void _membersWatcher_Postgres(); void networksDbWatcher(); - void _networksWatcher_Postgres(PGconn *conn); + void _networksWatcher_Postgres(); void _membersWatcher_Redis(); void _networksWatcher_Redis(); @@ -81,7 +142,7 @@ private: NO_OVERRIDE = 1 }; - PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE ); + std::shared_ptr > _pool; const Identity _myId; const Address _myAddress; diff --git a/make-mac.mk b/make-mac.mk index ce4e680c9..423bfad09 100644 --- a/make-mac.mk +++ b/make-mac.mk @@ -28,9 +28,9 @@ include objects.mk ONE_OBJS+=osdep/MacEthernetTap.o osdep/MacKextEthernetTap.o osdep/MacDNSHelper.o ext/http-parser/http_parser.o ifeq ($(ZT_CONTROLLER),1) - LIBS+=-L/usr/local/opt/libpq/lib -lpq ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a ext/hiredis-0.14.1/lib/macos/libhiredis.a + LIBS+=-L/usr/local/opt/libpqxx@6/lib -L/usr/local/opt/libpq/lib -L/usr/local/opt/openssl/lib/ -lpqxx -lpq -lssl -lcrypto -lgssapi_krb5 ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a ext/hiredis-0.14.1/lib/macos/libhiredis.a DEFS+=-DZT_CONTROLLER_USE_LIBPQ -DZT_CONTROLLER_USE_REDIS -DZT_CONTROLLER - INCLUDES+=-I/usr/local/opt/libpq/include -Iext/hiredis-0.14.1/include/ -Iext/redis-plus-plus-1.1.1/install/macos/include/sw/ + INCLUDES+=-I/usr/local/opt/libpq/include -I/usr/local/opt/libpqxx@6/include -Iext/hiredis-0.14.1/include/ -Iext/redis-plus-plus-1.1.1/install/macos/include/sw/ endif LIBS+=-framework CoreServices -framework SystemConfiguration -framework CoreFoundation