Files
scylladb/query/query.cc
Ernest Zaslavsky d624413ddd treewide: Move query related files to a new query directory
As requested in #22120, moved the files and fixed other includes and build system.

Moved files:
- query.cc
- query-request.hh
- query-result.hh
- query-result-reader.hh
- query-result-set.cc
- query-result-set.hh
- query-result-writer.hh
- query_id.hh
- query_result_merger.hh

Fixes: #22120

This is a cleanup, no need to backport

Closes scylladb/scylladb#25105
2025-09-16 23:40:47 +03:00

465 lines
17 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <limits>
#include <memory>
#include <stdexcept>
#include <fmt/ranges.h>
#include "query-request.hh"
#include "query-result.hh"
#include "query-result-writer.hh"
#include "query-result-set.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/thread.hh>
#include "bytes.hh"
#include "mutation/mutation_partition_serializer.hh"
#include "query-result-reader.hh"
#include "query_result_merger.hh"
#include "partition_slice_builder.hh"
#include "schema/schema_registry.hh"
#include "utils/assert.hh"
#include "utils/overloaded_functor.hh"
namespace query {
static logging::logger qlogger("query");
constexpr size_t result_memory_limiter::minimum_result_size;
constexpr size_t result_memory_limiter::maximum_result_size;
constexpr size_t result_memory_limiter::unlimited_result_size;
thread_local semaphore result_memory_tracker::_dummy { 0 };
const dht::partition_range full_partition_range = dht::partition_range::make_open_ended_both_sides();
const clustering_range full_clustering_range = clustering_range::make_open_ended_both_sides();
std::ostream& operator<<(std::ostream& out, const specific_ranges& s);
std::ostream& operator<<(std::ostream& out, const partition_slice& ps) {
fmt::print(out,
"{{regular_cols=[{}], static_cols=[{}], rows=[{}]",
fmt::join(ps.regular_columns, ", "),
fmt::join(ps.static_columns, ", "),
ps._row_ranges);
if (ps._specific_ranges) {
fmt::print(out, ", specific=[{}]", *ps._specific_ranges);
}
// FIXME: pretty print options
fmt::print(out, ", options={:x}, , partition_row_limit={}}}",
ps.options.mask(), ps.partition_row_limit());
return out;
}
std::ostream& operator<<(std::ostream& out, const read_command& r) {
fmt::print(out, "read_command{{cf_id={}, version={}, slice={}, limit={}, timestamp={}, partition_limit={}, query_uuid={}, is_first_page={}, read_timestamp={}}}",
r.cf_id, r.schema_version, r.slice, r.get_row_limit(), r.timestamp.time_since_epoch().count(), r.partition_limit, r.query_uuid, r.is_first_page, r.read_timestamp);
return out;
}
lw_shared_ptr<query::read_command> reversed(lw_shared_ptr<query::read_command>&& cmd)
{
auto schema = local_schema_registry().get(cmd->schema_version)->get_reversed();
cmd->schema_version = schema->version();
cmd->slice = query::legacy_reverse_slice_to_native_reverse_slice(*schema, cmd->slice);
return std::move(cmd);
}
std::ostream& operator<<(std::ostream& out, const mapreduce_request::reduction_type& r) {
out << "reduction_type{";
switch (r) {
case mapreduce_request::reduction_type::count:
out << "count";
break;
case mapreduce_request::reduction_type::aggregate:
out << "aggregate";
break;
}
return out << "}";
}
std::ostream& operator<<(std::ostream& out, const mapreduce_request::aggregation_info& a) {
fmt::print(out, "aggregation_info{{, name={}, column_names=[{}]}}",
a.name, fmt::join(a.column_names, ","));;
return out;
}
std::ostream& operator<<(std::ostream& out, const mapreduce_request& r) {
auto ms = std::chrono::time_point_cast<std::chrono::milliseconds>(r.timeout).time_since_epoch().count();
fmt::print(out, "mapreduce_request{{reductions=[{}]",
fmt::join(r.reduction_types, ","));
if (r.aggregation_infos) {
fmt::print(out, ", aggregation_infos=[{}]",
fmt::join(r.aggregation_infos.value(), ","));
}
if (r.shard_id_hint) {
fmt::print(out, ", shard_id_hint={}", r.shard_id_hint.value());
}
fmt::print(out, ", cmd={}, pr={}, cl={}, timeout(ms)={}}}",
r.cmd, r.pr, r.cl, ms);
return out;
}
std::ostream& operator<<(std::ostream& out, const specific_ranges& s) {
fmt::print(out, "{{{} : {}}}", s._pk, fmt::join(s._ranges, ", "));
return out;
}
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition pos) {
auto cmp = position_in_partition::composite_tri_compare(s);
auto it = ranges.begin();
while (it != ranges.end()) {
auto end_bound = position_in_partition_view::for_range_end(*it);
if (cmp(end_bound, pos) <= 0) {
it = ranges.erase(it);
continue;
} else if (auto start_bound = position_in_partition_view::for_range_start(*it); cmp(start_bound, pos) <= 0) {
SCYLLA_ASSERT(cmp(pos, end_bound) < 0);
*it = clustering_range(clustering_range::bound(pos.key(), pos.get_bound_weight() != bound_weight::after_all_prefixed), it->end());
}
++it;
}
}
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key) {
return trim_clustering_row_ranges_to(s, ranges, position_in_partition::after_key(s, key));
}
clustering_range reverse(const clustering_range& range) {
if (range.is_singular()) {
return range;
}
return clustering_range(range.end(), range.start());
}
static void reverse_clustering_ranges_bounds(clustering_row_ranges& ranges) {
for (auto& range : ranges) {
range = reverse(range);
}
}
partition_slice legacy_reverse_slice_to_native_reverse_slice(const schema& schema, partition_slice slice) {
return partition_slice_builder(schema, std::move(slice))
.mutate_ranges([] (clustering_row_ranges& ranges) { reverse_clustering_ranges_bounds(ranges); })
.mutate_specific_ranges([] (specific_ranges& ranges) { reverse_clustering_ranges_bounds(ranges.ranges()); })
.build();
}
partition_slice native_reverse_slice_to_legacy_reverse_slice(const schema& schema, partition_slice slice) {
// They are the same, we give them different names to express intent
return legacy_reverse_slice_to_native_reverse_slice(schema, std::move(slice));
}
partition_slice reverse_slice(const schema& schema, partition_slice slice) {
return partition_slice_builder(schema, std::move(slice))
.mutate_ranges([] (clustering_row_ranges& ranges) {
std::reverse(ranges.begin(), ranges.end());
reverse_clustering_ranges_bounds(ranges);
})
.mutate_specific_ranges([] (specific_ranges& sranges) {
auto& ranges = sranges.ranges();
std::reverse(ranges.begin(), ranges.end());
reverse_clustering_ranges_bounds(ranges);
})
.with_option_toggled<partition_slice::option::reversed>()
.build();
}
partition_slice::partition_slice(clustering_row_ranges row_ranges,
query::column_id_vector static_columns,
query::column_id_vector regular_columns,
option_set options,
std::unique_ptr<specific_ranges> specific_ranges,
cql_serialization_format cql_format,
uint32_t partition_row_limit_low_bits,
uint32_t partition_row_limit_high_bits)
: _row_ranges(std::move(row_ranges))
, static_columns(std::move(static_columns))
, regular_columns(std::move(regular_columns))
, options(options)
, _specific_ranges(std::move(specific_ranges))
, _partition_row_limit_low_bits(partition_row_limit_low_bits)
, _partition_row_limit_high_bits(partition_row_limit_high_bits)
{
cql_format.ensure_supported();
}
partition_slice::partition_slice(clustering_row_ranges row_ranges,
query::column_id_vector static_columns,
query::column_id_vector regular_columns,
option_set options,
std::unique_ptr<specific_ranges> specific_ranges,
uint64_t partition_row_limit)
: partition_slice(std::move(row_ranges), std::move(static_columns), std::move(regular_columns), options,
std::move(specific_ranges), cql_serialization_format::latest(), static_cast<uint32_t>(partition_row_limit),
static_cast<uint32_t>(partition_row_limit >> 32))
{}
partition_slice::partition_slice(clustering_row_ranges ranges, const schema& s, const column_set& columns, option_set options)
: partition_slice(ranges, query::column_id_vector{}, query::column_id_vector{}, options)
{
regular_columns.reserve(columns.count());
for (ordinal_column_id id = columns.find_first(); id != column_set::npos; id = columns.find_next(id)) {
const column_definition& def = s.column_at(id);
if (def.is_static()) {
static_columns.push_back(def.id);
} else if (def.is_regular()) {
regular_columns.push_back(def.id);
} // else clustering or partition key column - skip, these are controlled by options
}
}
partition_slice::partition_slice(partition_slice&&) = default;
partition_slice& partition_slice::operator=(partition_slice&& other) noexcept = default;
// Only needed because selection_statement::execute does copies of its read_command
// in the map-reduce op.
partition_slice::partition_slice(const partition_slice& s)
: _row_ranges(s._row_ranges)
, static_columns(s.static_columns)
, regular_columns(s.regular_columns)
, options(s.options)
, _specific_ranges(s._specific_ranges ? std::make_unique<specific_ranges>(*s._specific_ranges) : nullptr)
, _partition_row_limit_low_bits(s._partition_row_limit_low_bits)
, _partition_row_limit_high_bits(s._partition_row_limit_high_bits)
{}
partition_slice::~partition_slice()
{}
const clustering_row_ranges& partition_slice::row_ranges(const schema& s, const partition_key& k) const {
auto* r = _specific_ranges ? _specific_ranges->range_for(s, k) : nullptr;
return r ? *r : _row_ranges;
}
void partition_slice::set_range(const schema& s, const partition_key& k, clustering_row_ranges range) {
if (!_specific_ranges) {
_specific_ranges = std::make_unique<specific_ranges>(k, std::move(range));
} else {
_specific_ranges->add(s, k, std::move(range));
}
}
void partition_slice::clear_range(const schema& s, const partition_key& k) {
if (_specific_ranges && _specific_ranges->contains(s, k)) {
// just in case someone changes the impl above,
// we should do actual remove if specific_ranges suddenly
// becomes an actual map
SCYLLA_ASSERT(_specific_ranges->size() == 1);
_specific_ranges = nullptr;
}
}
clustering_row_ranges partition_slice::get_all_ranges() const {
auto all_ranges = default_row_ranges();
const auto& specific_ranges = get_specific_ranges();
if (specific_ranges) {
all_ranges.insert(all_ranges.end(), specific_ranges->ranges().begin(), specific_ranges->ranges().end());
}
return all_ranges;
}
sstring
result::pretty_print(schema_ptr s, const query::partition_slice& slice) const {
std::ostringstream out;
out << "{ result: " << result_set::from_raw_result(s, slice, *this);
out << " digest: ";
if (_digest) {
out << std::hex << std::setw(2);
for (auto&& c : _digest->get()) {
out << unsigned(c) << " ";
}
} else {
out << "{}";
}
out << ", short_read=" << is_short_read() << " }";
return out.str();
}
query::result::printer
result::pretty_printer(schema_ptr s, const query::partition_slice& slice) const {
return query::result::printer{s, slice, *this};
}
std::ostream& operator<<(std::ostream& os, const query::result::printer& p) {
os << p.res.pretty_print(p.s, p.slice);
return os;
}
void result::ensure_counts() {
if (!_partition_count || !row_count()) {
uint64_t row_count;
std::tie(_partition_count, row_count) = result_view::do_with(*this, [] (auto&& view) {
return view.count_partitions_and_rows();
});
set_row_count(row_count);
}
}
full_position result::get_or_calculate_last_position() const {
if (_last_position) {
return *_last_position;
}
return result_view::do_with(*this, [] (const result_view& v) {
return v.calculate_last_position();
});
}
result::result()
: result([] {
bytes_ostream out;
ser::writer_of_query_result<bytes_ostream>(out).skip_partitions().end_query_result();
return out;
}(), short_read::no, 0, 0, {})
{ }
static void write_partial_partition(ser::writer_of_qr_partition<bytes_ostream>&& pw, const ser::qr_partition_view& pv, uint64_t rows_to_include) {
auto key = pv.key();
auto static_cells_wr = (key ? std::move(pw).write_key(*key) : std::move(pw).skip_key())
.start_static_row()
.start_cells();
for (auto&& cell : pv.static_row().cells()) {
static_cells_wr.add(cell);
}
auto rows_wr = std::move(static_cells_wr)
.end_cells()
.end_static_row()
.start_rows();
auto rows = pv.rows();
// rows.size() can be 0 is there's a single static row
auto it = rows.begin();
for (uint64_t i = 0; i < std::min(rows.size(), rows_to_include); ++i) {
rows_wr.add(*it++);
}
std::move(rows_wr).end_rows().end_qr_partition();
}
foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
if (_partial.size() == 1) {
return std::move(_partial[0]);
}
bytes_ostream w;
auto partitions = ser::writer_of_query_result<bytes_ostream>(w).start_partitions();
uint64_t row_count = 0;
short_read is_short_read;
uint32_t partition_count = 0;
std::optional<full_position> last_position;
for (auto&& r : _partial) {
result_view::do_with(*r, [&] (result_view rv) {
last_position.reset();
for (auto&& pv : rv._v.partitions()) {
auto rows = pv.rows();
// If rows.empty(), then there's a static row, or there wouldn't be a partition
const uint64_t rows_in_partition = rows.size() ? : 1;
const uint64_t rows_to_include = std::min(_max_rows - row_count, rows_in_partition);
row_count += rows_to_include;
if (rows_to_include >= rows_in_partition) {
partitions.add(pv);
if (++partition_count >= _max_partitions) {
return;
}
} else if (rows_to_include > 0) {
++partition_count;
write_partial_partition(partitions.add(), pv, rows_to_include);
return;
} else {
return;
}
}
last_position = r->last_position();
});
if (r->is_short_read()) {
is_short_read = short_read::yes;
break;
}
if (row_count >= _max_rows || partition_count >= _max_partitions) {
break;
}
}
std::move(partitions).end_partitions().end_query_result();
return make_foreign(make_lw_shared<query::result>(std::move(w), is_short_read, row_count, partition_count, std::move(last_position)));
}
std::ostream& operator<<(std::ostream& out, const query::mapreduce_result::printer& p) {
if (p.functions.size() != p.res.query_results.size()) {
return out << "[malformed mapreduce_result (" << p.res.query_results.size()
<< " results, " << p.functions.size() << " aggregates)]";
}
out << "[";
for (size_t i = 0; i < p.functions.size(); i++) {
auto& return_type = p.functions[i]->return_type();
out << return_type->to_string(bytes_view(*p.res.query_results[i]));
if (i + 1 < p.functions.size()) {
out << ", ";
}
}
return out << "]";
}
}
std::optional<query::clustering_range> position_range_to_clustering_range(const position_range& r, const schema& s) {
SCYLLA_ASSERT(r.start().get_type() == partition_region::clustered);
SCYLLA_ASSERT(r.end().get_type() == partition_region::clustered);
if (r.start().has_key() && r.end().has_key()
&& clustering_key_prefix::equality(s)(r.start().key(), r.end().key())) {
SCYLLA_ASSERT(r.start().get_bound_weight() != r.end().get_bound_weight());
if (r.end().get_bound_weight() == bound_weight::after_all_prefixed
&& r.start().get_bound_weight() != bound_weight::after_all_prefixed) {
// [before x, after x) and [for x, after x) get converted to [x, x].
return query::clustering_range::make_singular(r.start().key());
}
// [before x, for x) does not contain any keys.
return std::nullopt;
}
// position_range -> clustering_range
// (recall that position_ranges are always left-closed, right opened):
// [before x, ...), [for x, ...) -> [x, ...
// [after x, ...) -> (x, ...
// [..., before x), [..., for x) -> ..., x)
// [..., after x) -> ..., x]
auto to_bound = [&s] (const position_in_partition& p, bool left) -> std::optional<query::clustering_range::bound> {
if (p.is_before_all_clustered_rows(s)) {
SCYLLA_ASSERT(left);
return {};
}
if (p.is_after_all_clustered_rows(s)) {
SCYLLA_ASSERT(!left);
return {};
}
SCYLLA_ASSERT(p.has_key());
auto bw = p.get_bound_weight();
bool inclusive = left
? bw != bound_weight::after_all_prefixed
: bw == bound_weight::after_all_prefixed;
return query::clustering_range::bound{p.key(), inclusive};
};
return query::clustering_range{to_bound(r.start(), true), to_bound(r.end(), false)};
}