As requested in #22120, moved the files and fixed other includes and build system. Moved files: - query.cc - query-request.hh - query-result.hh - query-result-reader.hh - query-result-set.cc - query-result-set.hh - query-result-writer.hh - query_id.hh - query_result_merger.hh Fixes: #22120 This is a cleanup, no need to backport Closes scylladb/scylladb#25105
397 lines
14 KiB
C++
397 lines
14 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "utils/assert.hh"
|
|
#include "bytes.hh"
|
|
#include "schema/schema_fwd.hh"
|
|
#include "query/query-result-reader.hh"
|
|
#include "selector.hh"
|
|
#include "cql3/column_specification.hh"
|
|
#include "cql3/functions/function.hh"
|
|
#include "exceptions/exceptions.hh"
|
|
#include "unimplemented.hh"
|
|
#include <seastar/core/thread.hh>
|
|
|
|
namespace cql3 {
|
|
|
|
class result_set;
|
|
class metadata;
|
|
class query_options;
|
|
|
|
namespace restrictions {
|
|
class statement_restrictions;
|
|
}
|
|
|
|
namespace selection {
|
|
|
|
class raw_selector;
|
|
class result_set_builder;
|
|
|
|
class selectors {
|
|
public:
|
|
virtual ~selectors() {}
|
|
|
|
virtual bool requires_thread() const = 0;
|
|
|
|
virtual bool is_aggregate() const = 0;
|
|
|
|
/**
|
|
* Adds the current row of the specified <code>ResultSetBuilder</code>.
|
|
*
|
|
* @param rs the <code>ResultSetBuilder</code>
|
|
* @throws InvalidRequestException
|
|
*/
|
|
virtual void add_input_row(result_set_builder& rs) = 0;
|
|
|
|
virtual std::uint64_t get_input_row_count() const = 0;
|
|
|
|
virtual std::vector<managed_bytes_opt> get_output_row() = 0;
|
|
|
|
// When not aggregating, each input row becomes one output row.
|
|
virtual std::vector<managed_bytes_opt> transform_input_row(result_set_builder& rs) = 0;
|
|
|
|
virtual void reset() = 0;
|
|
};
|
|
|
|
class selection {
|
|
private:
|
|
schema_ptr _schema;
|
|
std::vector<const column_definition*> _columns;
|
|
::shared_ptr<metadata> _metadata;
|
|
const bool _collect_timestamps;
|
|
const bool _collect_TTLs;
|
|
const bool _contains_static_columns;
|
|
bool _is_trivial;
|
|
protected:
|
|
using trivial = bool_class<class trivial_tag>;
|
|
|
|
selection(schema_ptr schema,
|
|
std::vector<const column_definition*> columns,
|
|
std::vector<lw_shared_ptr<column_specification>> metadata_,
|
|
bool collect_timestamps,
|
|
bool collect_TTLs, trivial is_trivial = trivial::no);
|
|
|
|
virtual ~selection() {}
|
|
public:
|
|
// Overridden by SimpleSelection when appropriate.
|
|
virtual bool is_wildcard() const {
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Checks if this selection contains static columns.
|
|
* @return <code>true</code> if this selection contains static columns, <code>false</code> otherwise;
|
|
*/
|
|
bool contains_static_columns() const {
|
|
return _contains_static_columns;
|
|
}
|
|
|
|
/**
|
|
* Checks if this selection contains only static columns.
|
|
* @return <code>true</code> if this selection contains only static columns, <code>false</code> otherwise;
|
|
*/
|
|
bool contains_only_static_columns() const;
|
|
|
|
/**
|
|
* Returns the index of the specified column, in the un-processed domain (before applying
|
|
* transformations to the input columns and aggregations)
|
|
*
|
|
* @param def the column definition
|
|
* @return the index of the specified column
|
|
*/
|
|
int32_t index_of(const column_definition& def) const;
|
|
|
|
bool has_column(const column_definition& def) const;
|
|
|
|
::shared_ptr<const metadata> get_result_metadata() const {
|
|
return _metadata;
|
|
}
|
|
|
|
::shared_ptr<metadata> get_result_metadata() {
|
|
return _metadata;
|
|
}
|
|
|
|
static ::shared_ptr<selection> wildcard(schema_ptr schema);
|
|
static std::vector<const column_definition*> wildcard_columns(schema_ptr schema);
|
|
static ::shared_ptr<selection> for_columns(schema_ptr schema, std::vector<const column_definition*> columns);
|
|
|
|
// Adds a column to the selection and result set. Returns an index within the result set row.
|
|
virtual uint32_t add_column_for_post_processing(const column_definition& c);
|
|
|
|
virtual std::vector<shared_ptr<functions::function>> used_functions() const { return {}; }
|
|
|
|
query::partition_slice::option_set get_query_options();
|
|
private:
|
|
static bool processes_selection(const std::vector<prepared_selector>& prepared_selectors);
|
|
|
|
static std::vector<lw_shared_ptr<column_specification>> collect_metadata(const schema& schema,
|
|
const std::vector<prepared_selector>& prepared_selectors);
|
|
public:
|
|
static ::shared_ptr<selection> from_selectors(data_dictionary::database db, schema_ptr schema, const sstring& ks, const std::vector<prepared_selector>& raw_selectors);
|
|
|
|
virtual std::unique_ptr<selectors> new_selectors() const = 0;
|
|
|
|
/**
|
|
* Returns a range of CQL3 columns this selection needs.
|
|
*/
|
|
auto const& get_columns() const {
|
|
return _columns;
|
|
}
|
|
|
|
uint32_t get_column_count() const {
|
|
return _columns.size();
|
|
}
|
|
|
|
virtual bool is_aggregate() const = 0;
|
|
|
|
virtual bool is_count() const {return false;}
|
|
|
|
virtual bool is_reducible() const {return false;}
|
|
|
|
virtual query::mapreduce_request::reductions_info get_reductions() const {return {{}, {}};}
|
|
|
|
/**
|
|
* Returns true if the selection is trivial, i.e. there are no function
|
|
* selectors (including casts or aggregates).
|
|
*/
|
|
bool is_trivial() const { return _is_trivial; }
|
|
|
|
friend class result_set_builder;
|
|
};
|
|
|
|
shared_ptr<selection> selection_from_partition_slice(schema_ptr schema, const query::partition_slice& slice);
|
|
|
|
class result_set_builder {
|
|
private:
|
|
std::unique_ptr<result_set> _result_set;
|
|
std::unique_ptr<selectors> _selectors;
|
|
const std::vector<size_t> _group_by_cell_indices; ///< Indices in \c current of cells holding GROUP BY values.
|
|
const uint64_t _limit; ///< Maximum number of rows to return.
|
|
const uint64_t _per_partition_limit; ///< Maximum number of rows to return per partition.
|
|
uint64_t _per_partition_remaining; ///< Remaining rows to return for the current partition.
|
|
uint64_t _per_partition_remaining_previous_partition; ///< Value of _per_partition_remaining before the current partition.
|
|
///< Necessary because the next page might continue the same partition,
|
|
///< but accept_partition_end() and accept_new_partition() will be called anyway.
|
|
std::vector<managed_bytes_opt> _last_group; ///< Previous row's group: all of GROUP BY column values.
|
|
bool _group_began; ///< Whether a group began being formed.
|
|
public:
|
|
std::vector<managed_bytes_opt> current;
|
|
std::vector<bytes> current_partition_key;
|
|
std::vector<bytes> current_clustering_key;
|
|
std::vector<api::timestamp_type> _timestamps;
|
|
std::vector<int32_t> _ttls;
|
|
const query_options* _options;
|
|
private:
|
|
const gc_clock::time_point _now;
|
|
public:
|
|
template<typename Func>
|
|
auto with_thread_if_needed(Func&& func) {
|
|
if (_selectors->requires_thread()) {
|
|
return async(std::move(func));
|
|
} else {
|
|
return futurize_invoke(std::move(func));
|
|
}
|
|
}
|
|
|
|
class nop_filter {
|
|
public:
|
|
inline bool operator()(const selection&, const std::vector<bytes>&, const std::vector<bytes>&, const query::result_row_view&, const query::result_row_view*) const {
|
|
return true;
|
|
}
|
|
void reset(const partition_key* = nullptr) {
|
|
}
|
|
uint64_t get_rows_dropped() const {
|
|
return 0;
|
|
}
|
|
};
|
|
class restrictions_filter {
|
|
const ::shared_ptr<const restrictions::statement_restrictions> _restrictions;
|
|
const query_options& _options;
|
|
const expr::expression& _partition_level_filter;
|
|
const expr::expression& _clustering_row_level_filter;
|
|
mutable bool _current_partition_does_not_match = false;
|
|
mutable uint64_t _rows_dropped = 0;
|
|
mutable uint64_t _remaining;
|
|
schema_ptr _schema;
|
|
mutable uint64_t _per_partition_limit;
|
|
mutable uint64_t _per_partition_remaining;
|
|
mutable uint64_t _rows_fetched_for_last_partition;
|
|
mutable std::optional<partition_key> _last_pkey;
|
|
mutable bool _is_first_partition_on_page = true;
|
|
public:
|
|
explicit restrictions_filter(::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
|
const query_options& options,
|
|
uint64_t remaining,
|
|
schema_ptr schema,
|
|
uint64_t per_partition_limit,
|
|
std::optional<partition_key> last_pkey = {},
|
|
uint64_t rows_fetched_for_last_partition = 0);
|
|
bool operator()(const selection& selection, const std::vector<bytes>& pk, const std::vector<bytes>& ck, const query::result_row_view& static_row, const query::result_row_view* row) const;
|
|
void reset(const partition_key* key = nullptr);
|
|
uint64_t get_rows_dropped() const {
|
|
return _rows_dropped;
|
|
}
|
|
private:
|
|
bool do_filter(const selection& selection, const std::vector<bytes>& pk, const std::vector<bytes>& ck, const query::result_row_view& static_row, const query::result_row_view* row) const;
|
|
};
|
|
|
|
result_set_builder(const selection& s, gc_clock::time_point now,
|
|
const query_options* options = nullptr,
|
|
std::vector<size_t> group_by_cell_indices = {},
|
|
uint64_t limit = std::numeric_limits<uint64_t>::max(),
|
|
uint64_t per_partition_limit = std::numeric_limits<uint64_t>::max());
|
|
void add_empty();
|
|
void add(bytes_opt value);
|
|
void add(const column_definition& def, const query::result_atomic_cell_view& c);
|
|
void add_collection(const column_definition& def, bytes_view c);
|
|
void start_new_row();
|
|
void complete_row();
|
|
void accept_new_partition(const std::vector<bytes>& key);
|
|
void accept_partition_end();
|
|
std::unique_ptr<result_set> build();
|
|
api::timestamp_type timestamp_of(size_t idx);
|
|
int32_t ttl_of(size_t idx);
|
|
size_t result_set_size() const;
|
|
|
|
// Implements ResultVisitor concept from query.hh
|
|
template<typename Filter = nop_filter>
|
|
class visitor {
|
|
protected:
|
|
result_set_builder& _builder;
|
|
const schema& _schema;
|
|
const selection& _selection;
|
|
uint64_t _row_count;
|
|
std::vector<bytes>& _partition_key;
|
|
std::vector<bytes>& _clustering_key;
|
|
Filter _filter;
|
|
public:
|
|
visitor(cql3::selection::result_set_builder& builder, const schema& s,
|
|
const selection& selection, Filter filter = Filter())
|
|
: _builder(builder)
|
|
, _schema(s)
|
|
, _selection(selection)
|
|
, _row_count(0)
|
|
, _partition_key(_builder.current_partition_key)
|
|
, _clustering_key(_builder.current_clustering_key)
|
|
, _filter(filter)
|
|
{}
|
|
visitor(visitor&&) = default;
|
|
|
|
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
|
|
if (def.type->is_multi_cell()) {
|
|
auto cell = i.next_collection_cell();
|
|
if (!cell) {
|
|
_builder.add_empty();
|
|
return;
|
|
}
|
|
_builder.add_collection(def, cell->linearize());
|
|
} else {
|
|
auto cell = i.next_atomic_cell();
|
|
if (!cell) {
|
|
_builder.add_empty();
|
|
return;
|
|
}
|
|
_builder.add(def, *cell);
|
|
}
|
|
}
|
|
|
|
void accept_new_partition(const partition_key& key, uint64_t row_count) {
|
|
auto partition_key = key.explode(_schema);
|
|
_builder.accept_new_partition(partition_key);
|
|
_partition_key = std::move(partition_key);
|
|
_row_count = row_count;
|
|
_filter.reset(&key);
|
|
}
|
|
|
|
void accept_new_partition(uint64_t row_count) {
|
|
_row_count = row_count;
|
|
_filter.reset();
|
|
}
|
|
|
|
void accept_new_row(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) {
|
|
_clustering_key = key.explode(_schema);
|
|
accept_new_row(static_row, row);
|
|
}
|
|
|
|
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) {
|
|
auto static_row_iterator = static_row.iterator();
|
|
auto row_iterator = row.iterator();
|
|
if (!_filter(_selection, _partition_key, _clustering_key, static_row, &row)) {
|
|
return;
|
|
}
|
|
_builder.start_new_row();
|
|
for (auto&& def : _selection.get_columns()) {
|
|
switch (def->kind) {
|
|
case column_kind::partition_key:
|
|
_builder.add(_partition_key[def->component_index()]);
|
|
break;
|
|
case column_kind::clustering_key:
|
|
if (_clustering_key.size() > def->component_index()) {
|
|
_builder.add(_clustering_key[def->component_index()]);
|
|
} else {
|
|
_builder.add({});
|
|
}
|
|
break;
|
|
case column_kind::regular_column:
|
|
add_value(*def, row_iterator);
|
|
break;
|
|
case column_kind::static_column:
|
|
add_value(*def, static_row_iterator);
|
|
break;
|
|
default:
|
|
SCYLLA_ASSERT(0);
|
|
}
|
|
}
|
|
_builder.complete_row();
|
|
}
|
|
|
|
uint64_t accept_partition_end(const query::result_row_view& static_row) {
|
|
if (_row_count == 0) {
|
|
if (!_filter(_selection, _partition_key, _clustering_key, static_row, nullptr)) {
|
|
return _filter.get_rows_dropped();
|
|
}
|
|
_builder.start_new_row();
|
|
auto static_row_iterator = static_row.iterator();
|
|
for (auto&& def : _selection.get_columns()) {
|
|
if (def->is_partition_key()) {
|
|
_builder.add(_partition_key[def->component_index()]);
|
|
} else if (def->is_static()) {
|
|
add_value(*def, static_row_iterator);
|
|
} else {
|
|
_builder.add_empty();
|
|
}
|
|
}
|
|
_builder.complete_row();
|
|
}
|
|
_builder.accept_partition_end();
|
|
return _filter.get_rows_dropped();
|
|
}
|
|
};
|
|
|
|
private:
|
|
bytes_opt get_value(data_type t, query::result_atomic_cell_view c);
|
|
|
|
/// True iff the \c current row ends a previously started group, either according to
|
|
/// _group_by_cell_indices or aggregation.
|
|
bool last_group_ended() const;
|
|
|
|
/// Gets output row from _selectors and resets them.
|
|
void flush_selectors();
|
|
|
|
/// Updates _last_group from the \c current row.
|
|
void update_last_group();
|
|
};
|
|
|
|
}
|
|
|
|
}
|