libpqxx  7.3.0
pipeline.hxx
1 /* Definition of the pqxx::pipeline class.
2  *
3  * Throughput-optimized query manager
4  *
5  * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/pipeline instead.
6  *
7  * Copyright (c) 2000-2020, Jeroen T. Vermeulen.
8  *
9  * See COPYING for copyright license. If you did not receive a file called
10  * COPYING with this source code, please notify the distributor of this
11  * mistake, or contact the author.
12  */
13 #ifndef PQXX_H_PIPELINE
14 #define PQXX_H_PIPELINE
15 
16 #include "pqxx/compiler-public.hxx"
17 #include "pqxx/internal/compiler-internal-pre.hxx"
18 
19 #include <limits>
20 #include <map>
21 #include <string>
22 
23 #include "pqxx/transaction_base.hxx"
24 
25 
26 namespace pqxx
27 {
29 
42 class PQXX_LIBEXPORT pipeline : public internal::transactionfocus
43 {
44 public:
45  using query_id = long;
46 
47  pipeline(pipeline const &) = delete;
48  pipeline &operator=(pipeline const &) = delete;
49 
50  explicit pipeline(transaction_base &t) : transactionfocus{t, s_classname}
51  {
52  init();
53  }
54  pipeline(transaction_base &t, std::string_view tname) :
55  transactionfocus{t, s_classname, tname}
56  {
57  init();
58  }
59 
60  ~pipeline() noexcept;
61 
63 
69  query_id insert(std::string_view);
70 
72 
78  void complete();
79 
81 
90  void flush();
91 
93 
101  void cancel();
102 
104  [[nodiscard]] bool is_finished(query_id) const;
105 
107 
113  result retrieve(query_id qid)
114  {
115  return retrieve(m_queries.find(qid)).second;
116  }
117 
119 
120  std::pair<query_id, result> retrieve();
121 
122  [[nodiscard]] bool empty() const noexcept { return std::empty(m_queries); }
123 
126 
137  int retain(int retain_max = 2);
138 
139 
141  void resume();
142 
143 private:
144  struct PQXX_PRIVATE Query
145  {
146  explicit Query(std::string_view q) :
147  query{std::make_shared<std::string>(q)}
148  {}
149 
150  std::shared_ptr<std::string> query;
151  result res;
152  };
153 
154  using QueryMap = std::map<query_id, Query>;
155 
156  void init();
157  void attach();
158  void detach();
159 
161  static constexpr query_id qid_limit() noexcept
162  {
163  // Parenthesise this to work around an eternal Visual C++ problem:
164  // Without the extra parentheses, unless NOMINMAX is defined, the
165  // preprocessor will mistake this "max" for its annoying built-in macro
166  // of the same name.
167  return (std::numeric_limits<query_id>::max)();
168  }
169 
171  PQXX_PRIVATE query_id generate_id();
172 
173  bool have_pending() const noexcept
174  {
175  return m_issuedrange.second != m_issuedrange.first;
176  }
177 
178  PQXX_PRIVATE void issue();
179 
181  void set_error_at(query_id qid) noexcept
182  {
183  if (qid < m_error)
184  m_error = qid;
185  }
186 
188  [[noreturn]] PQXX_PRIVATE void internal_error(std::string const &err);
189 
190  PQXX_PRIVATE bool obtain_result(bool expect_none = false);
191 
192  PQXX_PRIVATE void obtain_dummy();
193  PQXX_PRIVATE void get_further_available_results();
194  PQXX_PRIVATE void check_end_results();
195 
197  PQXX_PRIVATE void receive_if_available();
198 
200  PQXX_PRIVATE void receive(pipeline::QueryMap::const_iterator stop);
201  std::pair<pipeline::query_id, result> retrieve(pipeline::QueryMap::iterator);
202 
203  QueryMap m_queries;
204  std::pair<QueryMap::iterator, QueryMap::iterator> m_issuedrange;
205  int m_retain = 0;
206  int m_num_waiting = 0;
207  query_id m_q_id = 0;
208 
210  bool m_dummy_pending = false;
211 
213  query_id m_error = qid_limit();
214 
216 
219  internal::encoding_group m_encoding;
220 
221  constexpr static std::string_view s_classname{"pipeline"};
222 };
223 } // namespace pqxx
224 
225 #include "pqxx/internal/compiler-internal-post.hxx"
226 #endif
pqxx::pipeline::pipeline
pipeline(transaction_base &t)
Definition: pipeline.hxx:50
pqxx
The home of all libpqxx classes, functions, templates, etc.
Definition: array.hxx:26
pqxx::pipeline::resume
void resume()
Resume retained query emission. Harmless when not needed.
Definition: pipeline.cxx:168
pqxx::internal::enc_group
encoding_group enc_group(int libpq_enc_id)
Definition: encodings.cxx:571
pqxx::pipeline::retrieve
std::pair< query_id, result > retrieve()
Retrieve oldest unretrieved result (possibly wait for one).
Definition: pipeline.cxx:144
pqxx::pipeline::pipeline
pipeline(transaction_base &t, std::string_view tname)
Definition: pipeline.hxx:54
pqxx::pipeline::insert
query_id insert(std::string_view)
Add query to the pipeline.
Definition: pipeline.cxx:67
pqxx::range_error
Something is out of range, similar to std::out_of_range.
Definition: except.hxx:193
pqxx::pipeline::pipeline
pipeline(pipeline const &)=delete
pqxx::internal_error
Internal error in libpqxx library.
Definition: except.hxx:158
pqxx::pipeline::~pipeline
~pipeline() noexcept
Definition: pipeline.cxx:41
pqxx::pipeline::query_id
long query_id
Definition: pipeline.hxx:45
pqxx::pipeline::is_finished
bool is_finished(query_id) const
Is result for given query available?
Definition: pipeline.cxx:133
pqxx::transaction_base
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:72
pqxx::pipeline::complete
void complete()
Wait for all ongoing or pending operations to complete, and detach.
Definition: pipeline.cxx:93
pqxx::pipeline::flush
void flush()
Forget all ongoing or pending operations and retrieved results.
Definition: pipeline.cxx:106
pqxx::pipeline::retain
int retain(int retain_max=2)
Definition: pipeline.cxx:152
pqxx::separated_list
std::string separated_list(std::string_view sep, ITER begin, ITER end, ACCESS access)
Represent sequence of values as a string, joined by a given separator.
Definition: separated_list.hxx:40
pqxx::pipeline
Processes several queries in FIFO manner, optimized for high throughput.
Definition: pipeline.hxx:43
pqxx::pipeline::operator=
pipeline & operator=(pipeline const &)=delete
pqxx::pipeline::empty
bool empty() const noexcept
Definition: pipeline.hxx:122
pqxx::result
Result set containing data returned by a query or command.
Definition: result.hxx:71
pqxx::pipeline::cancel
void cancel()
Cancel ongoing query, if any.
Definition: pipeline.cxx:121