/** Implementation of the pqxx::connection class. * * pqxx::connection encapsulates a connection to a database. * * Copyright (c) 2000-2022, Jeroen T. Vermeulen. * * See COPYING for copyright license. If you did not receive a file called * COPYING with this source code, please notify the distributor of this * mistake, or contact the author. */ #include "pqxx-source.hxx" #include #include #include #include #include #include #include #include #include #include #include #include #include // For fcntl(). #if __has_include() # include #endif #if __has_include() # include #endif // For ioctlsocket(). #if defined(_WIN32) && __has_include() # include #endif extern "C" { #include } #include "pqxx/internal/header-pre.hxx" #include "pqxx/binarystring.hxx" #include "pqxx/internal/wait.hxx" #include "pqxx/nontransaction.hxx" #include "pqxx/notification.hxx" #include "pqxx/pipeline.hxx" #include "pqxx/result.hxx" #include "pqxx/strconv.hxx" #include "pqxx/transaction.hxx" #include "pqxx/internal/gates/errorhandler-connection.hxx" #include "pqxx/internal/gates/result-connection.hxx" #include "pqxx/internal/gates/result-creation.hxx" #include "pqxx/internal/header-post.hxx" extern "C" { // The PQnoticeProcessor that receives an error or warning from libpq and // sends it to the appropriate connection for processing. void pqxx_notice_processor(void *conn, char const *msg) noexcept { reinterpret_cast(conn)->process_notice(msg); } // There's no way in libpq to disable a connection's notice processor. So, // set an inert one to get the same effect. void inert_notice_processor(void *, char const *) noexcept {} } // extern "C" using namespace std::literals; std::string PQXX_COLD pqxx::encrypt_password(char const user[], char const password[]) { std::unique_ptr> p{ PQencryptPassword(password, user), PQfreemem}; return {p.get()}; } pqxx::connection::connection(connection &&rhs) : m_conn{rhs.m_conn}, m_unique_id{rhs.m_unique_id} { rhs.check_movable(); rhs.m_conn = nullptr; } pqxx::connection::connection( connection::connect_mode, zview connection_string) : m_conn{PQconnectStart(connection_string.c_str())} { if (m_conn == nullptr) throw std::bad_alloc{}; if (status() == CONNECTION_BAD) throw pqxx::broken_connection{PQerrorMessage(m_conn)}; } std::pair pqxx::connection::poll_connect() { switch (PQconnectPoll(m_conn)) { case PGRES_POLLING_FAILED: throw pqxx::broken_connection{PQerrorMessage(m_conn)}; case PGRES_POLLING_READING: return std::make_pair(true, false); case PGRES_POLLING_WRITING: return std::make_pair(false, true); case PGRES_POLLING_OK: if (not is_open()) throw pqxx::broken_connection{PQerrorMessage(m_conn)}; return std::make_pair(false, false); case PGRES_POLLING_ACTIVE: throw internal_error{ "Nonblocking connection poll returned obsolete 'active' state."}; default: throw internal_error{ "Nonblocking connection poll returned unknown value."}; } } void pqxx::connection::complete_init() { if (m_conn == nullptr) throw std::bad_alloc{}; try { if (not is_open()) throw broken_connection{PQerrorMessage(m_conn)}; set_up_state(); } catch (std::exception const &) { PQfinish(m_conn); m_conn = nullptr; throw; } } void pqxx::connection::init(char const options[]) { m_conn = PQconnectdb(options); complete_init(); } void pqxx::connection::init(char const *params[], char const *values[]) { m_conn = PQconnectdbParams(params, values, 0); complete_init(); } void pqxx::connection::check_movable() const { if (m_trans) throw pqxx::usage_error{"Moving a connection with a transaction open."}; if (not std::empty(m_errorhandlers)) throw pqxx::usage_error{ "Moving a connection with error handlers registered."}; if (not std::empty(m_receivers)) throw pqxx::usage_error{ "Moving a connection with notification receivers registered."}; } void pqxx::connection::check_overwritable() const { if (m_trans) throw pqxx::usage_error{ "Moving a connection onto one with a transaction open."}; if (not std::empty(m_errorhandlers)) throw pqxx::usage_error{ "Moving a connection onto one with error handlers registered."}; if (not std::empty(m_receivers)) throw usage_error{ "Moving a connection onto one " "with notification receivers registered."}; } pqxx::connection &pqxx::connection::operator=(connection &&rhs) { check_overwritable(); rhs.check_movable(); close(); m_conn = std::exchange(rhs.m_conn, nullptr); m_unique_id = rhs.m_unique_id; return *this; } pqxx::result pqxx::connection::make_result( internal::pq::PGresult *pgr, std::shared_ptr const &query, std::string_view desc) { if (pgr == nullptr) { if (is_open()) throw failure(err_msg()); else throw broken_connection{"Lost connection to the database server."}; } internal::encoding_group enc; try { enc = internal::enc_group(encoding_id()); } catch (std::exception const &) { // Don't let the PGresult leak. // TODO: Can we just accept a unique_ptr instead? internal::clear_result(pgr); throw; } auto const r{pqxx::internal::gate::result_creation::create(pgr, query, enc)}; pqxx::internal::gate::result_creation{r}.check_status(desc); return r; } int PQXX_COLD pqxx::connection::backendpid() const &noexcept { return (m_conn == nullptr) ? 0 : PQbackendPID(m_conn); } namespace { PQXX_PURE int socket_of(::pqxx::internal::pq::PGconn const *c) noexcept { return (c == nullptr) ? -1 : PQsocket(c); } } // namespace int pqxx::connection::sock() const &noexcept { return socket_of(m_conn); } int PQXX_COLD pqxx::connection::protocol_version() const noexcept { return (m_conn == nullptr) ? 0 : PQprotocolVersion(m_conn); } int PQXX_COLD pqxx::connection::server_version() const noexcept { return PQserverVersion(m_conn); } void pqxx::connection::set_variable( std::string_view var, std::string_view value) & { exec(internal::concat("SET ", quote_name(var), "=", value)); } std::string pqxx::connection::get_variable(std::string_view var) { return exec(internal::concat("SHOW ", quote_name(var))) .at(0) .at(0) .as(std::string{}); } std::string pqxx::connection::get_var(std::string_view var) { // (Variables can't be null, so far as I can make out.) return exec(internal::concat("SHOW "sv, quote_name(var)))[0][0] .as(); } /** Set up various parts of logical connection state that may need to be * recovered because the physical connection to the database was lost and is * being reset, or that may not have been initialized yet. */ void pqxx::connection::set_up_state() { if (auto const proto_ver{protocol_version()}; proto_ver < 3) { if (proto_ver == 0) throw broken_connection{"No connection."}; else throw feature_not_supported{ "Unsupported frontend/backend protocol version; 3.0 is the minimum."}; } if (server_version() <= 90000) throw feature_not_supported{ "Unsupported server version; 9.0 is the minimum."}; // The default notice processor in libpq writes to stderr. Ours does // nothing. // If the caller registers an error handler, this gets replaced with an // error handler that walks down the connection's chain of handlers. We // don't do that by default because there's a danger: libpq may call the // notice processor via a result object, even after the connection has been // destroyed and the handlers list no longer exists. PQXX_LIKELY PQsetNoticeProcessor(m_conn, inert_notice_processor, nullptr); } bool pqxx::connection::is_open() const noexcept { return status() == CONNECTION_OK; } void pqxx::connection::process_notice_raw(char const msg[]) noexcept { if ((msg == nullptr) or (*msg == '\0')) return; auto const rbegin = std::crbegin(m_errorhandlers), rend = std::crend(m_errorhandlers); for (auto i{rbegin}; (i != rend) and (**i)(msg); ++i) ; } void pqxx::connection::process_notice(char const msg[]) noexcept { if (msg == nullptr) return; zview const view{msg}; if (std::empty(view)) return; else if (msg[std::size(view) - 1] == '\n') process_notice_raw(msg); else // Newline is missing. Let the zview version of the code add it. PQXX_UNLIKELY process_notice(view); } void pqxx::connection::process_notice(zview msg) noexcept { if (std::empty(msg)) return; else if (msg[std::size(msg) - 1] == '\n') process_notice_raw(msg.c_str()); else try { // Add newline. std::string buf; buf.reserve(std::size(msg) + 1); buf.assign(msg); buf.push_back('\n'); process_notice_raw(buf.c_str()); } catch (std::exception const &) { // If nothing else works, try writing the message without the newline. PQXX_UNLIKELY process_notice_raw(msg.c_str()); } } void PQXX_COLD pqxx::connection::trace(FILE *out) noexcept { if (m_conn) { if (out) PQtrace(m_conn, out); else PQuntrace(m_conn); } } void PQXX_COLD pqxx::connection::add_receiver(pqxx::notification_receiver *n) { if (n == nullptr) throw argument_error{"Null receiver registered"}; // Add to receiver list and attempt to start listening. auto const p{m_receivers.find(n->channel())}; auto const new_value{receiver_list::value_type{n->channel(), n}}; if (p == std::end(m_receivers)) { // Not listening on this event yet, start doing so. auto const lq{std::make_shared( internal::concat("LISTEN ", quote_name(n->channel())))}; make_result(PQexec(m_conn, lq->c_str()), lq, *lq); m_receivers.insert(new_value); } else { m_receivers.insert(p, new_value); } } void PQXX_COLD pqxx::connection::remove_receiver(pqxx::notification_receiver *T) noexcept { if (T == nullptr) return; try { auto needle{ std::pair{T->channel(), T}}; auto R{m_receivers.equal_range(needle.first)}; auto i{find(R.first, R.second, needle)}; if (i == R.second) { PQXX_UNLIKELY process_notice(internal::concat( "Attempt to remove unknown receiver '", needle.first, "'")); } else { // Erase first; otherwise a notification for the same receiver may yet // come in and wreak havoc. Thanks Dragan Milenkovic. bool const gone{R.second == ++R.first}; m_receivers.erase(i); if (gone) exec(internal::concat("UNLISTEN ", quote_name(needle.first)).c_str()); } } catch (std::exception const &e) { PQXX_UNLIKELY process_notice(e.what()); } } bool pqxx::connection::consume_input() noexcept { return PQconsumeInput(m_conn) != 0; } bool pqxx::connection::is_busy() const noexcept { return PQisBusy(m_conn) != 0; } void PQXX_COLD pqxx::connection::cancel_query() { using pointer = std::unique_ptr>; pointer cancel{PQgetCancel(m_conn), PQfreeCancel}; if (cancel == nullptr) PQXX_UNLIKELY throw std::bad_alloc{}; std::array errbuf; auto const err{errbuf.data()}; auto const c{ PQcancel(cancel.get(), err, static_cast(std::size(errbuf)))}; if (c == 0) PQXX_UNLIKELY throw pqxx::sql_error{std::string{err, std::size(errbuf)}, "[cancel]"}; } namespace { // C++20: std::span? /// Get error string for a given @c errno value. template char const *PQXX_COLD error_string(int err_num, std::array &buffer) { // Not entirely clear whether strerror_s will be in std or global namespace. using namespace std; #if defined(PQXX_HAVE_STERROR_S) || defined(PQXX_HAVE_STRERROR_R) # if defined(PQXX_HAVE_STRERROR_S) auto const err_result{strerror_s(std::data(buffer), BYTES, err_num)}; # else auto const err_result{strerror_r(err_num, std::data(buffer), BYTES)}; # endif if constexpr (std::is_same_v, char *>) { // GNU version of strerror_r; returns the error string, which may or may // not reside within buffer. return err_result; } else { // Either strerror_s or POSIX strerror_r; returns an error code. // Sorry for being lazy here: Not reporting error string for the case // where we can't retrieve an error string. if (err_result == 0) return std::data(buffer); else return "Compound errors."; } #else // Fallback case, hopefully for no actual platforms out there. pqxx::ignore_unused(err_num, buffer); return "(No error information available.)"; #endif } } // namespace #if defined(_WIN32) || __has_include() void pqxx::connection::set_blocking(bool block) & { auto const fd{sock()}; # if defined _WIN32 unsigned long mode{not block}; if (::ioctlsocket(fd, FIONBIO, &mode) != 0) { std::array errbuf; char const *err{error_string(WSAGetLastError(), errbuf)}; throw broken_connection{ internal::concat("Could not set socket's blocking mode: ", err)}; } # else // _WIN32 std::array errbuf; auto flags{::fcntl(fd, F_GETFL, 0)}; if (flags == -1) { char const *const err{error_string(errno, errbuf)}; throw broken_connection{ internal::concat("Could not get socket state: ", err)}; } if (block) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; if (::fcntl(fd, F_SETFL, flags) == -1) { char const *const err{error_string(errno, errbuf)}; throw broken_connection{ internal::concat("Could not set socket's blocking mode: ", err)}; } # endif // _WIN32 } #endif // defined(_WIN32) || __has_include() void PQXX_COLD pqxx::connection::set_verbosity(error_verbosity verbosity) &noexcept { PQsetErrorVerbosity(m_conn, static_cast(verbosity)); } namespace { /// Unique pointer to PGnotify. using notify_ptr = std::unique_ptr>; /// Get one notification from a connection, or null. notify_ptr get_notif(pqxx::internal::pq::PGconn *conn) { return notify_ptr(PQnotifies(conn), PQfreemem); } } // namespace int pqxx::connection::get_notifs() { if (not consume_input()) throw broken_connection{"Connection lost."}; // Even if somehow we receive notifications during our transaction, don't // deliver them. if (m_trans) PQXX_UNLIKELY return 0; int notifs = 0; for (auto N{get_notif(m_conn)}; N.get(); N = get_notif(m_conn)) { notifs++; auto const Hit{m_receivers.equal_range(std::string{N->relname})}; if (Hit.second != Hit.first) { std::string payload{N->extra}; for (auto i{Hit.first}; i != Hit.second; ++i) try { (*i->second)(payload, N->be_pid); } catch (std::exception const &e) { try { process_notice(internal::concat( "Exception in notification receiver '", i->first, "': ", e.what(), "\n")); } catch (std::bad_alloc const &) { // Out of memory. Try to get the message out in a more robust way. process_notice( "Exception in notification receiver, " "and also ran out of memory\n"); } catch (std::exception const &) { process_notice( "Exception in notification receiver " "(compounded by other error)\n"); } } } N.reset(); } return notifs; } char const *PQXX_COLD pqxx::connection::dbname() const { return PQdb(m_conn); } char const *PQXX_COLD pqxx::connection::username() const { return PQuser(m_conn); } char const *PQXX_COLD pqxx::connection::hostname() const { return PQhost(m_conn); } char const *PQXX_COLD pqxx::connection::port() const { return PQport(m_conn); } char const *pqxx::connection::err_msg() const noexcept { return (m_conn == nullptr) ? "No connection to database" : PQerrorMessage(m_conn); } void PQXX_COLD pqxx::connection::register_errorhandler(errorhandler *handler) { // Set notice processor on demand, i.e. only when the caller actually // registers an error handler. // We do this just to make it less likely that users fall into the trap // where a result object may hold a notice processor derived from its parent // connection which has already been destroyed. Our notice processor goes // through the connection's list of error handlers. If the connection object // has already been destroyed though, that list no longer exists. // By setting the notice processor on demand, we absolve users who never // register an error handler from ahving to care about this nasty subtlety. if (std::empty(m_errorhandlers)) PQsetNoticeProcessor(m_conn, pqxx_notice_processor, this); m_errorhandlers.push_back(handler); } void PQXX_COLD pqxx::connection::unregister_errorhandler(errorhandler *handler) noexcept { // The errorhandler itself will take care of nulling its pointer to this // connection. m_errorhandlers.remove(handler); if (std::empty(m_errorhandlers)) PQsetNoticeProcessor(m_conn, inert_notice_processor, nullptr); } std::vector PQXX_COLD pqxx::connection::get_errorhandlers() const { return {std::begin(m_errorhandlers), std::end(m_errorhandlers)}; } pqxx::result pqxx::connection::exec(std::string_view query, std::string_view desc) { return exec(std::make_shared(query), desc); } pqxx::result pqxx::connection::exec( std::shared_ptr query, std::string_view desc) { auto const res{make_result(PQexec(m_conn, query->c_str()), query, desc)}; get_notifs(); return res; } std::string pqxx::connection::encrypt_password( char const user[], char const password[], char const *algorithm) { #if defined(PQXX_HAVE_PQENCRYPTPASSWORDCONN) { auto const buf{PQencryptPasswordConn(m_conn, password, user, algorithm)}; std::unique_ptr> ptr{ buf, [](char const *x) { PQfreemem(const_cast(x)); }}; return std::string(ptr.get()); } #else { // No PQencryptPasswordConn. Fall back on the old PQencryptPassword... // unless the caller selects a different algorithm. if (algorithm != nullptr and std::strcmp(algorithm, "md5") != 0) throw feature_not_supported{ "Could not encrypt password: available libpq version does not support " "algorithms other than md5."}; # include "pqxx/internal/ignore-deprecated-pre.hxx" return pqxx::encrypt_password(user, password); # include "pqxx/internal/ignore-deprecated-post.hxx" } #endif // PQXX_HAVE_PQENCRYPTPASSWORDCONN } void pqxx::connection::prepare(char const name[], char const definition[]) & { auto const q{std::make_shared( pqxx::internal::concat("[PREPARE ", name, "]"))}; auto const r{ make_result(PQprepare(m_conn, name, definition, 0, nullptr), q, *q)}; } void pqxx::connection::prepare(char const definition[]) & { this->prepare("", definition); } void pqxx::connection::unprepare(std::string_view name) { exec(internal::concat("DEALLOCATE ", quote_name(name))); } pqxx::result pqxx::connection::exec_prepared( std::string_view statement, internal::c_params const &args) { auto const q{std::make_shared(statement)}; auto const pq_result{PQexecPrepared( m_conn, q->c_str(), check_cast(std::size(args.values), "exec_prepared"sv), args.values.data(), args.lengths.data(), reinterpret_cast(args.formats.data()), static_cast(format::text))}; auto const r{make_result(pq_result, q, statement)}; get_notifs(); return r; } void pqxx::connection::close() { try { if (m_trans) PQXX_UNLIKELY process_notice(internal::concat( "Closing connection while ", internal::describe_object("transaction"sv, m_trans->name()), " is still open.")); if (not std::empty(m_receivers)) { PQXX_UNLIKELY process_notice("Closing connection with outstanding receivers."); m_receivers.clear(); } std::list old_handlers; m_errorhandlers.swap(old_handlers); auto const rbegin{std::crbegin(old_handlers)}, rend{std::crend(old_handlers)}; for (auto i{rbegin}; i != rend; ++i) pqxx::internal::gate::errorhandler_connection{**i}.unregister(); PQfinish(m_conn); m_conn = nullptr; } catch (std::exception const &) { m_conn = nullptr; throw; } } int pqxx::connection::status() const noexcept { return PQstatus(m_conn); } namespace { /// Return a name for t, if t is non-null and has a name; or empty string. std::string_view get_name(pqxx::transaction_base const *t) { return (t == nullptr) ? ""sv : t->name(); } } // namespace void pqxx::connection::register_transaction(transaction_base *t) { internal::check_unique_register( m_trans, "transaction", get_name(m_trans), t, "transaction", get_name(t)); m_trans = t; } void pqxx::connection::unregister_transaction(transaction_base *t) noexcept { try { internal::check_unique_unregister( m_trans, "transaction", get_name(m_trans), t, "transaction", get_name(t)); } catch (std::exception const &e) { process_notice(e.what()); } m_trans = nullptr; } std::pair>, std::size_t> pqxx::connection::read_copy_line() { char *buf{nullptr}; // Allocate once, re-use across invocations. static auto const q{std::make_shared("[END COPY]")}; auto const line_len{PQgetCopyData(m_conn, &buf, false)}; switch (line_len) { case -2: // Error. throw failure{ internal::concat("Reading of table data failed: ", err_msg())}; case -1: // End of COPY. make_result(PQgetResult(m_conn), q, *q); return {}; case 0: // "Come back later." throw internal_error{"table read inexplicably went asynchronous"}; default: // Success, got buffer size. // Line size includes a trailing zero, which we ignore. auto const text_len{static_cast(line_len) - 1}; return std::make_pair( std::unique_ptr>{buf, PQfreemem}, text_len); } } void pqxx::connection::write_copy_line(std::string_view line) { static std::string const err_prefix{"Error writing to table: "}; auto const size{check_cast( internal::ssize(line), "Line in stream_to is too long to process."sv)}; if (PQputCopyData(m_conn, line.data(), size) <= 0) PQXX_UNLIKELY throw failure{err_prefix + err_msg()}; if (PQputCopyData(m_conn, "\n", 1) <= 0) PQXX_UNLIKELY throw failure{err_prefix + err_msg()}; } void pqxx::connection::end_copy_write() { int res{PQputCopyEnd(m_conn, nullptr)}; switch (res) { case -1: throw failure{internal::concat("Write to table failed: ", err_msg())}; case 0: throw internal_error{"table write is inexplicably asynchronous"}; case 1: // Normal termination. Retrieve result object. break; default: throw internal_error{ internal::concat("unexpected result ", res, " from PQputCopyEnd()")}; } static auto const q{std::make_shared("[END COPY]")}; make_result(PQgetResult(m_conn), q, *q); } void pqxx::connection::start_exec(char const query[]) { if (PQsendQuery(m_conn, query) == 0) PQXX_UNLIKELY throw failure{err_msg()}; } pqxx::internal::pq::PGresult *pqxx::connection::get_result() { return PQgetResult(m_conn); } size_t pqxx::connection::esc_to_buf(std::string_view text, char *buf) const { int err{0}; auto const copied{ PQescapeStringConn(m_conn, buf, text.data(), std::size(text), &err)}; if (err) PQXX_UNLIKELY throw argument_error{err_msg()}; return copied; } std::string pqxx::connection::esc(std::string_view text) const { std::string buf; buf.resize(2 * std::size(text) + 1); auto const copied{esc_to_buf(text, buf.data())}; buf.resize(copied); return buf; } std::string PQXX_COLD pqxx::connection::esc_raw(unsigned char const bin[], std::size_t len) const { return pqxx::internal::esc_bin(binary_cast(bin, len)); } std::string pqxx::connection::esc_raw(std::basic_string_view bin) const { return pqxx::internal::esc_bin(bin); } std::string PQXX_COLD pqxx::connection::unesc_raw(char const text[]) const { if (text[0] == '\\' and text[1] == 'x') { // Hex-escaped format. std::string buf; buf.resize(pqxx::internal::size_unesc_bin(std::strlen(text))); pqxx::internal::unesc_bin( std::string_view{text}, reinterpret_cast(buf.data())); return buf; } else { // Legacy escape format. // TODO: Remove legacy support. std::size_t len; auto bytes{const_cast( reinterpret_cast(text))}; std::unique_ptr> const ptr{PQunescapeBytea(bytes, &len), PQfreemem}; return std::string{ptr.get(), ptr.get() + len}; } } std::string PQXX_COLD pqxx::connection::quote_raw(unsigned char const bin[], std::size_t len) const { return internal::concat("'", esc_raw(binary_cast(bin, len)), "'::bytea"); } std::string pqxx::connection::quote_raw(std::basic_string_view bytes) const { return internal::concat("'", esc_raw(bytes), "'::bytea"); } std::string PQXX_COLD pqxx::connection::quote(binarystring const &b) const { return quote(b.bytes_view()); } std::string pqxx::connection::quote(std::basic_string_view b) const { return internal::concat("'", esc_raw(b), "'::bytea"); } std::string pqxx::connection::quote_name(std::string_view identifier) const { std::unique_ptr> buf{ PQescapeIdentifier(m_conn, identifier.data(), std::size(identifier)), PQfreemem}; if (buf.get() == nullptr) PQXX_UNLIKELY throw failure{err_msg()}; return std::string{buf.get()}; } std::string pqxx::connection::quote_table(std::string_view table_name) const { return this->quote_name(table_name); } std::string pqxx::connection::quote_table(table_path path) const { return separated_list( ".", std::begin(path), std::end(path), [this](auto name) { return this->quote_name(*name); }); } std::string pqxx::connection::esc_like(std::string_view text, char escape_char) const { std::string out; out.reserve(std::size(text)); internal::for_glyphs( internal::enc_group(encoding_id()), [&out, escape_char](char const *gbegin, char const *gend) { if ((gend - gbegin == 1) and (*gbegin == '_' or *gbegin == '%')) // We're not expecting a lot of wildcards in a string. Usually. PQXX_UNLIKELY out.push_back(escape_char); for (; gbegin != gend; ++gbegin) out.push_back(*gbegin); }, text.data(), std::size(text)); return out; } int pqxx::connection::await_notification() { int notifs = get_notifs(); if (notifs == 0) { PQXX_LIKELY internal::wait_fd(socket_of(m_conn), true, false, 10, 0); notifs = get_notifs(); } return notifs; } int pqxx::connection::await_notification( std::time_t seconds, long microseconds) { int notifs = get_notifs(); if (notifs == 0) { PQXX_LIKELY internal::wait_fd( socket_of(m_conn), true, false, check_cast(seconds, "Seconds out of range."), check_cast(microseconds, "Microseconds out of range.")); return get_notifs(); } return notifs; } std::string pqxx::connection::adorn_name(std::string_view n) { auto const id{to_string(++m_unique_id)}; if (std::empty(n)) return pqxx::internal::concat("x", id); else return pqxx::internal::concat(n, "_", id); } std::string pqxx::connection::get_client_encoding() const { return internal::name_encoding(encoding_id()); } void PQXX_COLD pqxx::connection::set_client_encoding(char const encoding[]) & { switch (auto const retval{PQsetClientEncoding(m_conn, encoding)}; retval) { case 0: // OK. PQXX_LIKELY break; case -1: PQXX_UNLIKELY if (is_open()) throw failure{"Setting client encoding failed."}; else throw broken_connection{"Lost connection to the database server."}; default: PQXX_UNLIKELY throw internal_error{internal::concat( "Unexpected result from PQsetClientEncoding: ", retval)}; } } int pqxx::connection::encoding_id() const { int const enc{PQclientEncoding(m_conn)}; if (enc == -1) { // PQclientEncoding does not query the database, but it does check for // broken connections. And unfortunately, we check the encoding right // *before* checking a query result for failure. So, we need to handle // connection failure here and it will apply in lots of places. // TODO: Make pqxx::result::result(...) do all the checking. PQXX_UNLIKELY if (is_open()) throw failure{"Could not obtain client encoding."}; else throw broken_connection{"Lost connection to the database server."}; } PQXX_LIKELY return enc; } pqxx::result pqxx::connection::exec_params( std::string_view query, internal::c_params const &args) { auto const q{std::make_shared(query)}; auto const pq_result{PQexecParams( m_conn, q->c_str(), check_cast(std::size(args.values), "exec_params"sv), nullptr, args.values.data(), args.lengths.data(), reinterpret_cast(args.formats.data()), static_cast(format::text))}; auto const r{make_result(pq_result, q)}; get_notifs(); return r; } namespace { /// Get the prevailing default value for a connection parameter. char const *get_default(PQconninfoOption const &opt) noexcept { if (opt.envvar == nullptr) { // There's no environment variable for this setting. The only default is // the one that was compiled in. return opt.compiled; } // As of C++11, std::getenv() uses thread-local storage, so it should be // thread-safe. MSVC still warns about it though. #if defined(_MSC_VER) # pragma warning(push) # pragma warning(disable : 4996) #endif char const *var{std::getenv(opt.envvar)}; #if defined(_MSC_VER) # pragma warning(pop) #endif if (var == nullptr) { // There's an environment variable for this setting, but it's not set. return opt.compiled; } // The environment variable is the prevailing default. return var; } } // namespace std::string pqxx::connection::connection_string() const { if (m_conn == nullptr) PQXX_UNLIKELY throw usage_error{"Can't get connection string: connection is not open."}; std::unique_ptr< PQconninfoOption, std::function> const params{ PQconninfo(m_conn), PQconninfoFree}; if (params.get() == nullptr) PQXX_UNLIKELY throw std::bad_alloc{}; std::string buf; for (std::size_t i{0}; params.get()[i].keyword != nullptr; ++i) { auto const param{params.get()[i]}; if (param.val != nullptr) { auto const default_val{get_default(param)}; if ( (default_val == nullptr) or (std::strcmp(param.val, default_val) != 0)) { if (not std::empty(buf)) buf.push_back(' '); buf += param.keyword; buf.push_back('='); buf += param.val; } } } return buf; } #if defined(_WIN32) || __has_include() pqxx::connecting::connecting(zview connection_string) : m_conn{connection::connect_nonblocking, connection_string} {} #endif // defined(_WIN32) || __has_include( #if defined(_WIN32) || __has_include() void pqxx::connecting::process() & { auto const [reading, writing]{m_conn.poll_connect()}; m_reading = reading; m_writing = writing; } #endif // defined(_WIN32) || __has_include( #if defined(_WIN32) || __has_include() pqxx::connection pqxx::connecting::produce() && { if (!done()) throw usage_error{ "Tried to produce a nonblocking connection before it was done."}; m_conn.complete_init(); return std::move(m_conn); } #endif // defined(_WIN32) || __has_include(