mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-08-05 08:12:51 +02:00
448 lines
11 KiB
C++
448 lines
11 KiB
C++
/** Implementation of the pqxx::pipeline class.
|
|
*
|
|
* Throughput-optimized query interface.
|
|
*
|
|
* Copyright (c) 2000-2022, Jeroen T. Vermeulen.
|
|
*
|
|
* See COPYING for copyright license. If you did not receive a file called
|
|
* COPYING with this source code, please notify the distributor of this
|
|
* mistake, or contact the author.
|
|
*/
|
|
#include "pqxx-source.hxx"
|
|
|
|
#include <iterator>
|
|
|
|
#include "pqxx/internal/header-pre.hxx"
|
|
|
|
#include "pqxx/dbtransaction.hxx"
|
|
#include "pqxx/internal/concat.hxx"
|
|
#include "pqxx/internal/gates/connection-pipeline.hxx"
|
|
#include "pqxx/internal/gates/result-creation.hxx"
|
|
#include "pqxx/internal/gates/result-pipeline.hxx"
|
|
#include "pqxx/pipeline.hxx"
|
|
#include "pqxx/separated_list.hxx"
|
|
|
|
#include "pqxx/internal/header-post.hxx"
|
|
|
|
|
|
namespace
|
|
{
|
|
std::string const theSeparator{"; "};
|
|
std::string const theDummyValue{"1"};
|
|
std::string const theDummyQuery{"SELECT " + theDummyValue + theSeparator};
|
|
} // namespace
|
|
|
|
|
|
void pqxx::pipeline::init()
|
|
{
|
|
m_encoding = internal::enc_group(m_trans.conn().encoding_id());
|
|
m_issuedrange = make_pair(std::end(m_queries), std::end(m_queries));
|
|
attach();
|
|
}
|
|
|
|
|
|
pqxx::pipeline::~pipeline() noexcept
|
|
{
|
|
try
|
|
{
|
|
cancel();
|
|
}
|
|
catch (std::exception const &)
|
|
{}
|
|
detach();
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::attach()
|
|
{
|
|
if (not registered())
|
|
register_me();
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::detach()
|
|
{
|
|
if (registered())
|
|
unregister_me();
|
|
}
|
|
|
|
|
|
pqxx::pipeline::query_id pqxx::pipeline::insert(std::string_view q) &
|
|
{
|
|
attach();
|
|
query_id const qid{generate_id()};
|
|
auto const i{m_queries.insert(std::make_pair(qid, Query(q))).first};
|
|
|
|
if (m_issuedrange.second == std::end(m_queries))
|
|
{
|
|
m_issuedrange.second = i;
|
|
if (m_issuedrange.first == std::end(m_queries))
|
|
m_issuedrange.first = i;
|
|
}
|
|
m_num_waiting++;
|
|
|
|
if (m_num_waiting > m_retain)
|
|
{
|
|
if (have_pending())
|
|
receive_if_available();
|
|
if (not have_pending())
|
|
issue();
|
|
}
|
|
|
|
return qid;
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::complete()
|
|
{
|
|
if (have_pending())
|
|
receive(m_issuedrange.second);
|
|
if (m_num_waiting and (m_error == qid_limit()))
|
|
{
|
|
issue();
|
|
receive(std::end(m_queries));
|
|
}
|
|
detach();
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::flush()
|
|
{
|
|
if (not std::empty(m_queries))
|
|
{
|
|
if (have_pending())
|
|
receive(m_issuedrange.second);
|
|
m_issuedrange.first = m_issuedrange.second = std::end(m_queries);
|
|
m_num_waiting = 0;
|
|
m_dummy_pending = false;
|
|
m_queries.clear();
|
|
}
|
|
detach();
|
|
}
|
|
|
|
|
|
void PQXX_COLD pqxx::pipeline::cancel()
|
|
{
|
|
while (have_pending())
|
|
{
|
|
pqxx::internal::gate::connection_pipeline(m_trans.conn()).cancel_query();
|
|
auto canceled_query{m_issuedrange.first};
|
|
++m_issuedrange.first;
|
|
m_queries.erase(canceled_query);
|
|
}
|
|
}
|
|
|
|
|
|
bool pqxx::pipeline::is_finished(pipeline::query_id q) const
|
|
{
|
|
if (m_queries.find(q) == std::end(m_queries))
|
|
throw std::logic_error{
|
|
internal::concat("Requested status for unknown query '", q, "'.")};
|
|
return (QueryMap::const_iterator(m_issuedrange.first) ==
|
|
std::end(m_queries)) or
|
|
(q < m_issuedrange.first->first and q < m_error);
|
|
}
|
|
|
|
|
|
std::pair<pqxx::pipeline::query_id, pqxx::result> pqxx::pipeline::retrieve()
|
|
{
|
|
if (std::empty(m_queries))
|
|
throw std::logic_error{"Attempt to retrieve result from empty pipeline."};
|
|
return retrieve(std::begin(m_queries));
|
|
}
|
|
|
|
|
|
int pqxx::pipeline::retain(int retain_max) &
|
|
{
|
|
if (retain_max < 0)
|
|
throw range_error{internal::concat(
|
|
"Attempt to make pipeline retain ", retain_max, " queries")};
|
|
|
|
int const oldvalue{m_retain};
|
|
m_retain = retain_max;
|
|
|
|
if (m_num_waiting >= m_retain)
|
|
resume();
|
|
|
|
return oldvalue;
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::resume() &
|
|
{
|
|
if (have_pending())
|
|
receive_if_available();
|
|
if (not have_pending() and m_num_waiting)
|
|
{
|
|
issue();
|
|
receive_if_available();
|
|
}
|
|
}
|
|
|
|
|
|
pqxx::pipeline::query_id pqxx::pipeline::generate_id()
|
|
{
|
|
if (m_q_id == qid_limit())
|
|
throw std::overflow_error{"Too many queries went through pipeline."};
|
|
++m_q_id;
|
|
return m_q_id;
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::issue()
|
|
{
|
|
// Retrieve that null result for the last query, if needed.
|
|
obtain_result();
|
|
|
|
// Don't issue anything if we've encountered an error.
|
|
if (m_error < qid_limit())
|
|
return;
|
|
|
|
// Start with oldest query (lowest id) not in previous issue range.
|
|
auto oldest{m_issuedrange.second};
|
|
|
|
// Construct cumulative query string for entire batch.
|
|
auto cum{separated_list(
|
|
theSeparator, oldest, std::end(m_queries),
|
|
[](QueryMap::const_iterator i) { return i->second.query; })};
|
|
auto const num_issued{
|
|
QueryMap::size_type(std::distance(oldest, std::end(m_queries)))};
|
|
bool const prepend_dummy{num_issued > 1};
|
|
if (prepend_dummy)
|
|
cum = theDummyQuery + cum;
|
|
|
|
pqxx::internal::gate::connection_pipeline{m_trans.conn()}.start_exec(
|
|
cum.c_str());
|
|
|
|
// Since we managed to send out these queries, update state to reflect this.
|
|
m_dummy_pending = prepend_dummy;
|
|
m_issuedrange.first = oldest;
|
|
m_issuedrange.second = std::end(m_queries);
|
|
m_num_waiting -= check_cast<int>(num_issued, "pipeline issue()"sv);
|
|
}
|
|
|
|
|
|
void PQXX_COLD pqxx::pipeline::internal_error(std::string const &err)
|
|
{
|
|
set_error_at(0);
|
|
throw pqxx::internal_error{err};
|
|
}
|
|
|
|
|
|
bool pqxx::pipeline::obtain_result(bool expect_none)
|
|
{
|
|
pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
|
|
auto const r{gate.get_result()};
|
|
if (r == nullptr)
|
|
{
|
|
if (have_pending() and not expect_none)
|
|
{
|
|
PQXX_UNLIKELY
|
|
set_error_at(m_issuedrange.first->first);
|
|
m_issuedrange.second = m_issuedrange.first;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
result const res{pqxx::internal::gate::result_creation::create(
|
|
r, std::begin(m_queries)->second.query, m_encoding)};
|
|
|
|
if (not have_pending())
|
|
{
|
|
PQXX_UNLIKELY
|
|
set_error_at(std::begin(m_queries)->first);
|
|
throw std::logic_error{
|
|
"Got more results from pipeline than there were queries."};
|
|
}
|
|
|
|
// Must be the result for the oldest pending query.
|
|
if (not std::empty(m_issuedrange.first->second.res))
|
|
PQXX_UNLIKELY
|
|
internal_error("Multiple results for one query.");
|
|
|
|
m_issuedrange.first->second.res = res;
|
|
++m_issuedrange.first;
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::obtain_dummy()
|
|
{
|
|
// Allocate once, re-use across invocations.
|
|
static auto const text{
|
|
std::make_shared<std::string>("[DUMMY PIPELINE QUERY]")};
|
|
|
|
pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
|
|
auto const r{gate.get_result()};
|
|
m_dummy_pending = false;
|
|
|
|
if (r == nullptr)
|
|
PQXX_UNLIKELY
|
|
internal_error("Pipeline got no result from backend when it expected one.");
|
|
|
|
result R{pqxx::internal::gate::result_creation::create(r, text, m_encoding)};
|
|
|
|
bool OK{false};
|
|
try
|
|
{
|
|
pqxx::internal::gate::result_creation{R}.check_status();
|
|
OK = true;
|
|
}
|
|
catch (sql_error const &)
|
|
{}
|
|
if (OK)
|
|
{
|
|
PQXX_LIKELY
|
|
if (std::size(R) > 1)
|
|
PQXX_UNLIKELY
|
|
internal_error("Unexpected result for dummy query in pipeline.");
|
|
|
|
if (R.at(0).at(0).as<std::string>() != theDummyValue)
|
|
PQXX_UNLIKELY
|
|
internal_error("Dummy query in pipeline returned unexpected value.");
|
|
return;
|
|
}
|
|
|
|
// TODO: Can we actually re-issue statements after a failure?
|
|
/* Execution of this batch failed.
|
|
*
|
|
* When we send multiple statements in one go, the backend treats them as a
|
|
* single transaction. So the entire batch was effectively rolled back.
|
|
*
|
|
* Since none of the queries in the batch were actually executed, we can
|
|
* afford to replay them one by one until we find the exact query that
|
|
* caused the error. This gives us not only a more specific error message
|
|
* to report, but also tells us which query to report it for.
|
|
*/
|
|
// First, give the whole batch the same syntax error message, in case all
|
|
// else is going to fail.
|
|
for (auto i{m_issuedrange.first}; i != m_issuedrange.second; ++i)
|
|
i->second.res = R;
|
|
|
|
// Remember where the end of this batch was
|
|
auto const stop{m_issuedrange.second};
|
|
|
|
// Retrieve that null result for the last query, if needed
|
|
obtain_result(true);
|
|
|
|
// Reset internal state to forget botched batch attempt
|
|
m_num_waiting += check_cast<int>(
|
|
std::distance(m_issuedrange.first, stop), "pipeline obtain_dummy()"sv);
|
|
m_issuedrange.second = m_issuedrange.first;
|
|
|
|
// Issue queries in failed batch one at a time.
|
|
unregister_me();
|
|
try
|
|
{
|
|
do {
|
|
m_num_waiting--;
|
|
auto const query{*m_issuedrange.first->second.query};
|
|
auto &holder{m_issuedrange.first->second};
|
|
holder.res = m_trans.exec(query);
|
|
pqxx::internal::gate::result_creation{holder.res}.check_status();
|
|
++m_issuedrange.first;
|
|
} while (m_issuedrange.first != stop);
|
|
}
|
|
catch (std::exception const &)
|
|
{
|
|
auto const thud{m_issuedrange.first->first};
|
|
++m_issuedrange.first;
|
|
m_issuedrange.second = m_issuedrange.first;
|
|
auto q{m_issuedrange.first};
|
|
set_error_at((q == std::end(m_queries)) ? thud + 1 : q->first);
|
|
}
|
|
}
|
|
|
|
|
|
std::pair<pqxx::pipeline::query_id, pqxx::result>
|
|
pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q)
|
|
{
|
|
if (q == std::end(m_queries))
|
|
throw std::logic_error{"Attempt to retrieve result for unknown query."};
|
|
|
|
if (q->first >= m_error)
|
|
throw std::runtime_error{
|
|
"Could not complete query in pipeline due to error in earlier query."};
|
|
|
|
// If query hasn't issued yet, do it now.
|
|
if (
|
|
m_issuedrange.second != std::end(m_queries) and
|
|
(q->first >= m_issuedrange.second->first))
|
|
{
|
|
if (have_pending())
|
|
receive(m_issuedrange.second);
|
|
if (m_error == qid_limit())
|
|
issue();
|
|
}
|
|
|
|
// If result not in yet, get it; else get at least whatever's convenient.
|
|
if (have_pending())
|
|
{
|
|
if (q->first >= m_issuedrange.first->first)
|
|
{
|
|
auto suc{q};
|
|
++suc;
|
|
receive(suc);
|
|
}
|
|
else
|
|
{
|
|
receive_if_available();
|
|
}
|
|
}
|
|
|
|
if (q->first >= m_error)
|
|
throw std::runtime_error{
|
|
"Could not complete query in pipeline due to error in earlier query."};
|
|
|
|
// Don't leave the backend idle if there are queries waiting to be issued.
|
|
if (m_num_waiting and not have_pending() and (m_error == qid_limit()))
|
|
issue();
|
|
|
|
result const R{q->second.res};
|
|
auto const P{std::make_pair(q->first, R)};
|
|
|
|
m_queries.erase(q);
|
|
|
|
pqxx::internal::gate::result_creation{R}.check_status();
|
|
return P;
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::get_further_available_results()
|
|
{
|
|
pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
|
|
while (not gate.is_busy() and obtain_result())
|
|
if (not gate.consume_input())
|
|
throw broken_connection{};
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::receive_if_available()
|
|
{
|
|
pqxx::internal::gate::connection_pipeline gate{m_trans.conn()};
|
|
if (not gate.consume_input())
|
|
throw broken_connection{};
|
|
if (gate.is_busy())
|
|
return;
|
|
|
|
if (m_dummy_pending)
|
|
obtain_dummy();
|
|
if (have_pending())
|
|
get_further_available_results();
|
|
}
|
|
|
|
|
|
void pqxx::pipeline::receive(pipeline::QueryMap::const_iterator stop)
|
|
{
|
|
if (m_dummy_pending)
|
|
obtain_dummy();
|
|
|
|
while (obtain_result() and
|
|
QueryMap::const_iterator{m_issuedrange.first} != stop)
|
|
;
|
|
|
|
// Also haul in any remaining "targets of opportunity".
|
|
if (QueryMap::const_iterator{m_issuedrange.first} == stop)
|
|
get_further_available_results();
|
|
}
|