Files
scylladb/query/query-result.hh
Ernest Zaslavsky d624413ddd treewide: Move query related files to a new query directory
As requested in #22120, moved the files and fixed other includes and build system.

Moved files:
- query.cc
- query-request.hh
- query-result.hh
- query-result-reader.hh
- query-result-set.cc
- query-result-set.hh
- query-result-writer.hh
- query_id.hh
- query_result_merger.hh

Fixes: #22120

This is a cleanup, no need to backport

Closes scylladb/scylladb#25105
2025-09-16 23:40:47 +03:00

456 lines
17 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "bytes_ostream.hh"
#include "utils/digest_algorithm.hh"
#include "query-request.hh"
#include "keys/full_position.hh"
#include <optional>
#include <fmt/ostream.h>
#include <seastar/util/bool_class.hh>
#include "seastarx.hh"
namespace query {
struct short_read_tag { };
using short_read = bool_class<short_read_tag>;
// result_memory_limiter, result_memory_accounter and result_memory_tracker
// form an infrastructure for limiting size of query results.
//
// result_memory_limiter is a shard-local object which ensures that all results
// combined do not use more than 10% of the shard memory.
//
// result_memory_accounter is used by result producers, updates the shard-local
// limits as well as keeps track of the individual maximum result size limit
// which is 1 MB.
//
// result_memory_tracker is just an object that makes sure the
// result_memory_limiter is notified when memory is released (but not sooner).
class result_memory_accounter;
class result_memory_limiter {
const size_t _maximum_total_result_memory;
semaphore _memory_limiter;
public:
static constexpr size_t minimum_result_size = 4 * 1024;
static constexpr size_t maximum_result_size = 1 * 1024 * 1024;
static constexpr size_t unlimited_result_size = std::numeric_limits<size_t>::max();
public:
explicit result_memory_limiter(size_t maximum_total_result_memory)
: _maximum_total_result_memory(maximum_total_result_memory)
, _memory_limiter(_maximum_total_result_memory)
{ }
result_memory_limiter(const result_memory_limiter&) = delete;
result_memory_limiter(result_memory_limiter&&) = delete;
ssize_t total_used_memory() const {
return _maximum_total_result_memory - _memory_limiter.available_units();
}
// Reserves minimum_result_size and creates new memory accounter for
// mutation query. Uses the specified maximum result size and may be
// stopped before reaching it due to memory pressure on shard.
future<result_memory_accounter> new_mutation_read(query::max_result_size max_result_size, short_read short_read_allowed);
// Reserves minimum_result_size and creates new memory accounter for
// data query. Uses the specified maximum result size, result will *not*
// be stopped due to on shard memory pressure in order to avoid digest
// mismatches.
future<result_memory_accounter> new_data_read(query::max_result_size max_result_size, short_read short_read_allowed);
// Creates a memory accounter for digest reads. Such accounter doesn't
// contribute to the shard memory usage, but still stops producing the
// result after individual limit has been reached.
future<result_memory_accounter> new_digest_read(query::max_result_size max_result_size, short_read short_read_allowed);
// Checks whether the result can grow any more, takes into account only
// the per shard limit.
stop_iteration check() const {
return stop_iteration(_memory_limiter.current() <= 0);
}
// Consumes n bytes from memory limiter and checks whether the result
// can grow any more (considering just the per-shard limit).
stop_iteration update_and_check(size_t n) {
_memory_limiter.consume(n);
return check();
}
void release(size_t n) noexcept {
_memory_limiter.signal(n);
}
semaphore& sem() noexcept { return _memory_limiter; }
};
class result_memory_tracker {
semaphore_units<> _units;
size_t _used_memory;
private:
static thread_local semaphore _dummy;
public:
result_memory_tracker() noexcept : _units(_dummy, 0), _used_memory(0) { }
result_memory_tracker(semaphore& sem, size_t blocked, size_t used) noexcept
: _units(sem, blocked), _used_memory(used) { }
size_t used_memory() const { return _used_memory; }
};
class result_memory_accounter {
result_memory_limiter* _limiter = nullptr;
size_t _blocked_bytes = 0;
size_t _used_memory = 0;
size_t _total_used_memory = 0;
query::max_result_size _maximum_result_size;
stop_iteration _stop_on_global_limit;
short_read _short_read_allowed;
mutable bool _below_soft_limit = true;
private:
// Mutation query accounter. Uses provided individual result size limit and
// will stop when shard memory pressure grows too high.
struct mutation_query_tag { };
explicit result_memory_accounter(mutation_query_tag, result_memory_limiter& limiter, query::max_result_size max_size, short_read short_read_allowed) noexcept
: _limiter(&limiter)
, _blocked_bytes(result_memory_limiter::minimum_result_size)
, _maximum_result_size(max_size)
, _stop_on_global_limit(true)
, _short_read_allowed(short_read_allowed)
{ }
// Data query accounter. Uses provided individual result size limit and
// will *not* stop even though shard memory pressure grows too high.
struct data_query_tag { };
explicit result_memory_accounter(data_query_tag, result_memory_limiter& limiter, query::max_result_size max_size, short_read short_read_allowed) noexcept
: _limiter(&limiter)
, _blocked_bytes(result_memory_limiter::minimum_result_size)
, _maximum_result_size(max_size)
, _short_read_allowed(short_read_allowed)
{ }
// Digest query accounter. Uses provided individual result size limit and
// will *not* stop even though shard memory pressure grows too high. This
// accounter does not contribute to the shard memory limits.
struct digest_query_tag { };
explicit result_memory_accounter(digest_query_tag, result_memory_limiter&, query::max_result_size max_size, short_read short_read_allowed) noexcept
: _blocked_bytes(0)
, _maximum_result_size(max_size)
, _short_read_allowed(short_read_allowed)
{ }
stop_iteration check_local_limit() const;
friend class result_memory_limiter;
public:
explicit result_memory_accounter(size_t max_size) noexcept
: _blocked_bytes(0)
, _maximum_result_size(max_size) {
}
result_memory_accounter(result_memory_accounter&& other) noexcept
: _limiter(std::exchange(other._limiter, nullptr))
, _blocked_bytes(other._blocked_bytes)
, _used_memory(other._used_memory)
, _total_used_memory(other._total_used_memory)
, _maximum_result_size(other._maximum_result_size)
, _stop_on_global_limit(other._stop_on_global_limit)
, _short_read_allowed(other._short_read_allowed)
, _below_soft_limit(other._below_soft_limit)
{ }
result_memory_accounter& operator=(result_memory_accounter&& other) noexcept {
if (this != &other) {
this->~result_memory_accounter();
new (this) result_memory_accounter(std::move(other));
}
return *this;
}
~result_memory_accounter() {
if (_limiter) {
_limiter->release(_blocked_bytes);
}
}
size_t used_memory() const { return _used_memory; }
// Consume n more bytes for the result. Returns stop_iteration::yes if
// the result cannot grow any more (taking into account both individual
// and per-shard limits).
stop_iteration update_and_check(size_t n) {
_used_memory += n;
_total_used_memory += n;
auto stop = check_local_limit();
if (_limiter && _used_memory > _blocked_bytes) {
auto to_block = std::min(_used_memory - _blocked_bytes, n);
_blocked_bytes += to_block;
stop = (_limiter->update_and_check(to_block) && _stop_on_global_limit) || stop;
if (stop && !_short_read_allowed) {
// If we are here we stopped because of the global limit.
throw std::runtime_error("Maximum amount of memory for building query results is exhausted, unpaged query cannot be finished");
}
}
return stop;
}
// Checks whether the result can grow any more.
stop_iteration check() const {
auto stop = check_local_limit();
if (!stop && _used_memory >= _blocked_bytes && _limiter) {
return _limiter->check() && _stop_on_global_limit;
}
return stop;
}
// Consume n more bytes for the result.
void update(size_t n) {
update_and_check(n);
}
result_memory_tracker done() && {
if (!_limiter) {
return { };
}
auto& sem = std::exchange(_limiter, nullptr)->sem();
return result_memory_tracker(sem, _blocked_bytes, _used_memory);
}
};
inline future<result_memory_accounter> result_memory_limiter::new_mutation_read(query::max_result_size max_size, short_read short_read_allowed) {
return _memory_limiter.wait(minimum_result_size).then([this, max_size, short_read_allowed] {
return result_memory_accounter(result_memory_accounter::mutation_query_tag(), *this, max_size, short_read_allowed);
});
}
inline future<result_memory_accounter> result_memory_limiter::new_data_read(query::max_result_size max_size, short_read short_read_allowed) {
return _memory_limiter.wait(minimum_result_size).then([this, max_size, short_read_allowed] {
return result_memory_accounter(result_memory_accounter::data_query_tag(), *this, max_size, short_read_allowed);
});
}
inline future<result_memory_accounter> result_memory_limiter::new_digest_read(query::max_result_size max_size, short_read short_read_allowed) {
return make_ready_future<result_memory_accounter>(result_memory_accounter(result_memory_accounter::digest_query_tag(), *this, max_size, short_read_allowed));
}
enum class result_request {
only_result,
only_digest,
result_and_digest,
};
struct result_options {
result_request request = result_request::only_result;
digest_algorithm digest_algo = query::digest_algorithm::none;
static result_options only_result() {
return result_options{};
}
static result_options only_digest(digest_algorithm da) {
return {result_request::only_digest, da};
}
};
class result_digest {
public:
using type = std::array<uint8_t, 16>;
private:
type _digest;
public:
result_digest() = default;
result_digest(type&& digest) : _digest(std::move(digest)) {}
const type& get() const { return _digest; }
bool operator==(const result_digest& rh) const = default;
};
//
// The query results are stored in a serialized form. This is in order to
// address the following problems, which a structured format has:
//
// - high level of indirection (vector of vectors of vectors of blobs), which
// is not CPU cache friendly
//
// - high allocation rate due to fine-grained object structure
//
// On replica side, the query results are probably going to be serialized in
// the transport layer anyway, so serializing the results up-front doesn't add
// net work. There is no processing of the query results on replica other than
// concatenation in case of range queries and checksum calculation. If query
// results are collected in serialized form from different cores, we can
// concatenate them without copying by simply appending the fragments into the
// packet.
//
// On coordinator side, the query results would have to be parsed from the
// transport layer buffers anyway, so the fact that iterators parse it also
// doesn't add net work, but again saves allocations and copying. The CQL
// server doesn't need complex data structures to process the results, it just
// goes over it linearly consuming it.
//
// The coordinator side could be optimized even further for CQL queries which
// do not need processing (eg. select * from cf where ...). We could make the
// replica send the query results in the format which is expected by the CQL
// binary protocol client. So in the typical case the coordinator would just
// pass the data using zero-copy to the client, prepending a header.
//
// Users which need more complex structure of query results can convert this
// to query::result_set.
//
// Related headers:
// - query-result-reader.hh
// - query-result-writer.hh
class result {
bytes_ostream _w;
std::optional<result_digest> _digest;
std::optional<uint32_t> _row_count_low_bits;
api::timestamp_type _last_modified = api::missing_timestamp;
short_read _short_read;
query::result_memory_tracker _memory_tracker;
std::optional<uint32_t> _partition_count;
std::optional<uint32_t> _row_count_high_bits;
std::optional<full_position> _last_position;
public:
class builder;
class partition_writer;
friend class result_merger;
result();
result(bytes_ostream&& w, short_read sr, std::optional<uint32_t> c_low_bits, std::optional<uint32_t> pc,
std::optional<uint32_t> c_high_bits, std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _row_count_low_bits(c_low_bits)
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(c_high_bits)
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(bytes_ostream&& w, std::optional<result_digest> d, api::timestamp_type last_modified,
short_read sr, std::optional<uint32_t> c_low_bits, std::optional<uint32_t> pc, std::optional<uint32_t> c_high_bits,
std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _digest(d)
, _row_count_low_bits(c_low_bits)
, _last_modified(last_modified)
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(c_high_bits)
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(bytes_ostream&& w, short_read sr, uint64_t c, std::optional<uint32_t> pc,
std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _row_count_low_bits(static_cast<uint32_t>(c))
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(static_cast<uint32_t>(c >> 32))
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(bytes_ostream&& w, std::optional<result_digest> d, api::timestamp_type last_modified,
short_read sr, uint64_t c, std::optional<uint32_t> pc, std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _digest(d)
, _row_count_low_bits(static_cast<uint32_t>(c))
, _last_modified(last_modified)
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(static_cast<uint32_t>(c >> 32))
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(result&&) = default;
result& operator=(result&&) = default;
const bytes_ostream& buf() const {
return _w;
}
const std::optional<result_digest>& digest() const {
return _digest;
}
const std::optional<uint32_t> row_count_low_bits() const {
return _row_count_low_bits;
}
const std::optional<uint32_t> row_count_high_bits() const {
return _row_count_high_bits;
}
const std::optional<uint64_t> row_count() const {
if (!_row_count_low_bits) {
return _row_count_low_bits;
}
return (static_cast<uint64_t>(_row_count_high_bits.value_or(0)) << 32) | _row_count_low_bits.value();
}
void set_row_count(std::optional<uint64_t> row_count) {
if (!row_count) {
_row_count_low_bits = std::nullopt;
_row_count_high_bits = std::nullopt;
} else {
_row_count_low_bits = std::make_optional(static_cast<uint32_t>(row_count.value()));
_row_count_high_bits = std::make_optional(static_cast<uint32_t>(row_count.value() >> 32));
}
}
api::timestamp_type last_modified() const {
return _last_modified;
}
short_read is_short_read() const {
return _short_read;
}
const std::optional<uint32_t>& partition_count() const {
return _partition_count;
}
void ensure_counts();
const std::optional<full_position>& last_position() const {
return _last_position;
}
void set_last_position(std::optional<full_position> last_position) {
_last_position = std::move(last_position);
}
// Return _last_position if replica filled it, otherwise calculate it based
// on the content (by looking up the last row in the last partition).
full_position get_or_calculate_last_position() const;
struct printer {
schema_ptr s;
const query::partition_slice& slice;
const query::result& res;
};
sstring pretty_print(schema_ptr, const query::partition_slice&) const;
printer pretty_printer(schema_ptr, const query::partition_slice&) const;
};
std::ostream& operator<<(std::ostream& os, const query::result::printer&);
}
template <> struct fmt::formatter<query::result::printer> : fmt::ostream_formatter {};