/* * Copyright (C) 2015-present ScyllaDB * * Modified by ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #include "cql3/selection/selection.hh" #include "cql3/selection/raw_selector.hh" #include "cql3/result_set.hh" #include "cql3/query_options.hh" #include "cql3/restrictions/statement_restrictions.hh" #include "cql3/expr/evaluate.hh" #include "cql3/expr/expr-utils.hh" #include "cql3/functions/first_function.hh" #include "cql3/functions/aggregate_fcts.hh" #include namespace cql3 { logger cql_logger("cql_logger"); namespace selection { selection::selection(schema_ptr schema, std::vector columns, std::vector> metadata_, bool collect_timestamps, bool collect_TTLs, trivial is_trivial) : _schema(std::move(schema)) , _columns(std::move(columns)) , _metadata(::make_shared(std::move(metadata_))) , _collect_timestamps(collect_timestamps) , _collect_TTLs(collect_TTLs) , _contains_static_columns(std::any_of(_columns.begin(), _columns.end(), std::mem_fn(&column_definition::is_static))) , _is_trivial(is_trivial) { } query::partition_slice::option_set selection::get_query_options() { query::partition_slice::option_set opts; opts.set_if(_collect_timestamps); opts.set_if(_collect_TTLs); opts.set_if( std::any_of(_columns.begin(), _columns.end(), std::mem_fn(&column_definition::is_partition_key))); opts.set_if( std::any_of(_columns.begin(), _columns.end(), std::mem_fn(&column_definition::is_clustering_key))); return opts; } bool selection::contains_only_static_columns() const { if (!contains_static_columns()) { return false; } if (is_wildcard()) { return false; } for (auto&& def : _columns) { if (!def->is_partition_key() && !def->is_static()) { return false; } } return true; } int32_t selection::index_of(const column_definition& def) const { auto i = std::find(_columns.begin(), _columns.end(), &def); if (i == _columns.end()) { return -1; } return std::distance(_columns.begin(), i); } bool selection::has_column(const column_definition& def) const { return std::find(_columns.begin(), _columns.end(), &def) != _columns.end(); } bool selection::processes_selection(const std::vector& prepared_selectors) { return std::any_of(prepared_selectors.begin(), prepared_selectors.end(), [] (auto&& s) { return cql3::selection::processes_selection(s); }); } // Special cased selection for when no function is used (this save some allocations). class simple_selection : public selection { private: const bool _is_wildcard; public: static ::shared_ptr make(schema_ptr schema, std::vector columns, bool is_wildcard) { std::vector> metadata; metadata.reserve(columns.size()); for (auto&& col : columns) { metadata.emplace_back(col->column_specification); } return ::make_shared(schema, std::move(columns), std::move(metadata), is_wildcard); } /* * In theory, even a simple selection could have multiple time the same column, so we * could filter those duplicate out of columns. But since we're very unlikely to * get much duplicate in practice, it's more efficient not to bother. */ simple_selection(schema_ptr schema, std::vector columns, std::vector> metadata, bool is_wildcard) : selection(schema, std::move(columns), std::move(metadata), false, false, trivial::yes) , _is_wildcard(is_wildcard) { } virtual bool is_wildcard() const override { return _is_wildcard; } virtual bool is_aggregate() const override { return false; } protected: class simple_selectors : public selectors { public: virtual void reset() override { on_internal_error(cql_logger, "simple_selectors::reset() called, but we don't support aggregation"); } virtual bool requires_thread() const override { return false; } // Should not be reached, since this is called when aggregating virtual std::vector get_output_row() override { on_internal_error(cql_logger, "simple_selectors::get_output_row() called, but we don't support aggregation"); } // Should not be reached, since this is called when aggregating virtual void add_input_row(result_set_builder& rs) override { on_internal_error(cql_logger, "simple_selectors::add_input_row() called, but we don't support aggregation"); } virtual std::uint64_t get_input_row_count() const override { on_internal_error(cql_logger, "simple_selectors::get_input_row_count() called, but we don't support aggregation"); } virtual std::vector transform_input_row(result_set_builder& rs) override { return std::move(rs.current); } virtual bool is_aggregate() const override { return false; } }; std::unique_ptr new_selectors() const override { return std::make_unique(); } }; shared_ptr selection_from_partition_slice(schema_ptr schema, const query::partition_slice& slice) { std::vector cdefs; cdefs.reserve(slice.static_columns.size() + slice.regular_columns.size()); for (auto static_col : slice.static_columns) { cdefs.push_back(&schema->static_column_at(static_col)); } for (auto regular_col : slice.regular_columns) { cdefs.push_back(&schema->regular_column_at(regular_col)); } return simple_selection::make(std::move(schema), std::move(cdefs), false); } static bool contains_column_mutation_attribute(expr::column_mutation_attribute::attribute_kind kind, const expr::expression& e) { return expr::find_in_expression(e, [kind] (const expr::column_mutation_attribute& cma) { return cma.kind == kind; }); } static bool contains_writetime(const expr::expression& e) { return contains_column_mutation_attribute(expr::column_mutation_attribute::attribute_kind::writetime, e); } static bool contains_ttl(const expr::expression& e) { return contains_column_mutation_attribute(expr::column_mutation_attribute::attribute_kind::ttl, e); } class selection_with_processing : public selection { private: std::vector _selectors; std::vector _inner_loop; std::vector _outer_loop; std::vector _initial_values_for_temporaries; public: selection_with_processing(schema_ptr schema, std::vector columns, std::vector> metadata, std::vector selectors) : selection(schema, std::move(columns), std::move(metadata), contains_writetime(expr::tuple_constructor{selectors}), contains_ttl(expr::tuple_constructor{selectors})) , _selectors(std::move(selectors)) { auto agg_split = expr::split_aggregation(_selectors); _outer_loop = std::move(agg_split.outer_loop); _inner_loop = std::move(agg_split.inner_loop); _initial_values_for_temporaries = std::move(agg_split.initial_values_for_temporaries); } virtual uint32_t add_column_for_post_processing(const column_definition& c) override { auto it = std::find_if(_selectors.begin(), _selectors.end(), [&c](const expr::expression& e) { auto col = expr::as_if(&e); return col && col->col == &c; }); if (it != _selectors.end()) { return std::distance(_selectors.begin(), it); } add_column(c); get_result_metadata()->add_non_serialized_column(c.column_specification); _selectors.push_back(expr::column_value(&c)); if (_inner_loop.empty()) { // Simple case: no aggregation return _selectors.size() - 1; } else { // Complex case: aggregation, must pass through temporary auto first_func = cql3::functions::aggregate_fcts::make_first_function(c.type); auto& agg = first_func->get_aggregate(); auto temp_index = _initial_values_for_temporaries.size(); auto temp = expr::temporary{ .index = temp_index, .type = agg.argument_types[0], }; _inner_loop.push_back( expr::function_call{ .func = agg.aggregation_function, .args = {temp, expr::column_value(&c)}, }); _initial_values_for_temporaries.push_back(raw_value::make_value(agg.initial_state)); _outer_loop.push_back( expr::function_call{ .func = agg.state_to_result_function, .args = {temp}, }); return _outer_loop.size() - 1; } } virtual bool is_aggregate() const override { return !_inner_loop.empty(); } virtual bool is_count() const override { return _selectors.size() == 1 && expr::find_in_expression(_selectors[0], [] (const expr::function_call& fc) { auto& func = std::get>(fc.func); return func->name() == functions::function_name::native_function(functions::aggregate_fcts::COUNT_ROWS_FUNCTION_NAME); }); } virtual bool is_reducible() const override { return std::ranges::all_of( _selectors, [] (const expr::expression& e) { auto fc = expr::as_if(&e); if (!fc) { return false; } auto func = std::get>(fc->func); if (!func->is_aggregate()) { return false; } auto agg_func = dynamic_pointer_cast(std::move(func)); if (!agg_func->get_aggregate().state_reduction_function) { return false; } // We only support transforming columns directly for parallel queries if (!std::ranges::all_of(fc->args, expr::is)) { return false; } return true; } ); } virtual query::mapreduce_request::reductions_info get_reductions() const override { std::vector types; std::vector infos; auto bad = [] { throw std::runtime_error("Selection doesn't have a reduction"); }; for (const auto& e : _selectors) { auto fc = expr::as_if(&e); if (!fc) { bad(); } auto func = std::get>(fc->func); if (!func->is_aggregate()) { bad(); } auto agg_func = dynamic_pointer_cast(std::move(func)); auto type = (agg_func->name().name == "countRows") ? query::mapreduce_request::reduction_type::count : query::mapreduce_request::reduction_type::aggregate; std::vector column_names; for (auto& arg : fc->args) { auto col = expr::as_if(&arg); if (!col) { bad(); } column_names.push_back(col->col->name_as_text()); } auto info = query::mapreduce_request::aggregation_info { .name = agg_func->name(), .column_names = std::move(column_names), }; types.push_back(type); infos.push_back(std::move(info)); } return {types, infos}; } virtual std::vector> used_functions() const override { auto ret = std::vector>(); expr::recurse_until(expr::tuple_constructor{_selectors}, [&] (const expr::expression& e) { if (auto fc = expr::as_if(&e)) { auto func = std::get>(fc->func); ret.push_back(func); if (auto agg_func = dynamic_pointer_cast(std::move(func))) { auto& agg = agg_func->get_aggregate(); if (agg.aggregation_function) { ret.push_back(agg.aggregation_function); } if (agg.state_to_result_function) { ret.push_back(agg.state_to_result_function); } } } return false; }); return ret; } protected: class selectors_with_processing : public selectors { private: const selection_with_processing& _sel; std::vector _temporaries; bool _requires_thread; std::uint64_t _input_row_count; public: explicit selectors_with_processing(const selection_with_processing& sel) : _sel(sel) , _temporaries(_sel._initial_values_for_temporaries) , _requires_thread(std::ranges::any_of(sel._selectors, [] (const expr::expression& e) { return expr::find_in_expression(e, [] (const expr::function_call& fc) { return std::get>(fc.func)->requires_thread(); }); })) , _input_row_count(0) { } virtual bool requires_thread() const override { return _requires_thread; } virtual void reset() override { _temporaries = _sel._initial_values_for_temporaries; _input_row_count = 0; } virtual bool is_aggregate() const override { return !_sel._inner_loop.empty(); } virtual std::vector transform_input_row(result_set_builder& rs) override { std::vector output_row; output_row.reserve(_sel._selectors.size()); auto inputs = expr::evaluation_inputs{ .partition_key = rs.current_partition_key, .clustering_key = rs.current_clustering_key, .static_and_regular_columns = rs.current, .selection = &_sel, .options = rs._options, .static_and_regular_timestamps = rs._timestamps, .static_and_regular_ttls = rs._ttls, .temporaries = {}, }; for (auto&& e : _sel._selectors) { auto out = expr::evaluate(e, inputs); output_row.emplace_back(std::move(out).to_managed_bytes_opt()); } return output_row; } virtual std::vector get_output_row() override { std::vector output_row; output_row.reserve(_sel._outer_loop.size()); auto inputs = expr::evaluation_inputs{ .partition_key = {}, .clustering_key = {}, .static_and_regular_columns = {}, .selection = &_sel, .options = nullptr, .static_and_regular_timestamps = {}, .static_and_regular_ttls = {}, .temporaries = _temporaries, }; for (auto&& e : _sel._outer_loop) { auto out = expr::evaluate(e, inputs); output_row.emplace_back(std::move(out).to_managed_bytes_opt()); } return output_row; } virtual void add_input_row(result_set_builder& rs) override { auto inputs = expr::evaluation_inputs{ .partition_key = rs.current_partition_key, .clustering_key = rs.current_clustering_key, .static_and_regular_columns = rs.current, .selection = &_sel, .options = nullptr, .static_and_regular_timestamps = rs._timestamps, .static_and_regular_ttls = rs._ttls, .temporaries = _temporaries, }; for (size_t i = 0; i != _sel._inner_loop.size(); ++i) { _temporaries[i] = expr::evaluate(_sel._inner_loop[i], inputs); } ++_input_row_count; } virtual std::uint64_t get_input_row_count() const override { return _input_row_count; } std::vector> used_functions() const { return _sel.used_functions(); } }; std::unique_ptr new_selectors() const override { return std::make_unique(*this); } }; // Return a list of columns that "SELECT *" should show - these are all // columns except potentially some that are is_hidden_from_cql() (currently, // those can be the "virtual columns" used in materialized views). // The list points to column_definition objects in the given schema_ptr, // which can be used only as long as the caller keeps the schema_ptr alive. std::vector selection::wildcard_columns(schema_ptr schema) { auto columns = schema->all_columns_in_select_order(); // filter out hidden columns, which should not be seen by the // user when doing "SELECT *". We also disallow selecting them // individually (see column_identifier::new_selector_factory()). return columns | std::views::filter([](const column_definition& c) { return !c.is_hidden_from_cql(); }) | std::views::transform([](const column_definition& c) { return &c; }) | std::ranges::to(); } ::shared_ptr selection::wildcard(schema_ptr schema) { return simple_selection::make(schema, wildcard_columns(schema), true); } ::shared_ptr selection::for_columns(schema_ptr schema, std::vector columns) { return simple_selection::make(schema, std::move(columns), false); } selection::add_column_result selection::add_column(const column_definition& c) { auto index = index_of(c); if (index != -1) { return {index, false}; } _columns.push_back(&c); return {_columns.size() - 1, true}; } uint32_t selection::add_column_for_post_processing(const column_definition& c) { auto col = add_column(c); if (col.added) { _metadata->add_non_serialized_column(c.column_specification); } return col.index; } ::shared_ptr selection::from_selectors(data_dictionary::database db, schema_ptr schema, const sstring& ks, const std::vector& prepared_selectors) { std::vector defs; for (auto&& [sel, alias] : prepared_selectors) { expr::for_each_expression(sel, [&] (const expr::column_value& cv) { if (std::find(defs.begin(), defs.end(), cv.col) == defs.end()) { defs.push_back(cv.col); } }); } auto metadata = collect_metadata(*schema, prepared_selectors); if (processes_selection(prepared_selectors) || prepared_selectors.size() != defs.size()) { return ::make_shared(schema, std::move(defs), std::move(metadata), prepared_selectors | std::views::transform(std::mem_fn(&prepared_selector::expr)) | std::ranges::to()); } else { return ::make_shared(schema, std::move(defs), std::move(metadata), false); } } std::vector> selection::collect_metadata(const schema& schema, const std::vector& prepared_selectors) { std::vector> r; r.reserve(prepared_selectors.size()); for (auto&& selector : prepared_selectors) { auto name = fmt::format("{:result_set_metadata}", selector.expr); auto col_id = ::make_shared(name, /* keep_case */ true); lw_shared_ptr col_spec = make_lw_shared( schema.ks_name(), schema.cf_name(), std::move(col_id), expr::type_of(selector.expr)); ::shared_ptr alias = selector.alias; r.push_back(alias ? col_spec->with_alias(alias) : col_spec); } return r; } result_set_builder::result_set_builder(const selection& s, gc_clock::time_point now, const query_options* options, std::vector group_by_cell_indices, uint64_t limit, uint64_t per_partition_limit) : _result_set(std::make_unique(::make_shared(*(s.get_result_metadata())))) , _selectors(s.new_selectors()) , _group_by_cell_indices(std::move(group_by_cell_indices)) , _limit(limit) , _per_partition_limit(per_partition_limit) , _per_partition_remaining(per_partition_limit) , _per_partition_remaining_previous_partition(per_partition_limit) , _last_group(_group_by_cell_indices.size()) , _group_began(false) , _options(options) , _now(now) { if (s._collect_timestamps) { _timestamps.resize(s._columns.size(), 0); } if (s._collect_TTLs) { _ttls.resize(s._columns.size(), 0); } } void result_set_builder::add_empty() { current.emplace_back(); if (!_timestamps.empty()) { _timestamps[current.size() - 1] = api::missing_timestamp; } if (!_ttls.empty()) { _ttls[current.size() - 1] = -1; } } void result_set_builder::add(bytes_opt value) { current.emplace_back(std::move(value)); } void result_set_builder::add(const column_definition& def, const query::result_atomic_cell_view& c) { current.emplace_back(get_value(def.type, c)); if (!_timestamps.empty()) { _timestamps[current.size() - 1] = c.timestamp(); } if (!_ttls.empty()) { gc_clock::duration ttl_left(-1); expiry_opt e = c.expiry(); if (e) { ttl_left = *e - _now; } _ttls[current.size() - 1] = ttl_left.count(); } } void result_set_builder::add_collection(const column_definition& def, bytes_view c) { current.emplace_back(to_bytes(c)); // timestamps, ttls meaningless for collections } void result_set_builder::update_last_group() { _group_began = true; std::ranges::transform(_group_by_cell_indices, _last_group.begin(), [this](size_t i) { return current[i]; }); } bool result_set_builder::last_group_ended() const { if (!_group_began) { return false; } if (_last_group.empty()) { return !_selectors->is_aggregate(); } return !std::ranges::equal( _last_group | std::views::reverse, _group_by_cell_indices | std::views::reverse | std::views::transform([this](size_t i) { return current[i]; })); } void result_set_builder::flush_selectors() { if (!_selectors->is_aggregate()) { // handled by process_current_row return; } if (_selectors->get_input_row_count() == 0) { return; } if (_result_set->size() < _limit) { if (_per_partition_remaining > 0) { _result_set->add_row(_selectors->get_output_row()); --_per_partition_remaining; } _selectors->reset(); } } void result_set_builder::complete_row() { if (!_selectors->is_aggregate()) { // Fast path when not aggregating _result_set->add_row(_selectors->transform_input_row(*this)); return; } if (last_group_ended()) { flush_selectors(); } update_last_group(); _selectors->add_input_row(*this); } void result_set_builder::accept_new_partition(const std::vector& key) { if (!_selectors->is_aggregate() || _group_by_cell_indices.empty()) { // No need to do anything if we're not aggregating. PER PARTITION LIMIT // for non-aggregating queries is handled earlier in the process. // If we're aggregating, but not grouping, we don't need to do anything either return; } if (key == current_partition_key) { // We're still in the same partition, which means that the query_pager // has called us with a new page of results. We need to reset the // per_partition_remaining to its previous value. _per_partition_remaining = _per_partition_remaining_previous_partition; return; } if (_per_partition_remaining_previous_partition > 0) { // We're on a new partition, and we have not exhausted the previous // partition's limit. We need to flush the selectors if there are any // rows left to process. _per_partition_remaining = _per_partition_remaining_previous_partition; flush_selectors(); // We need to reset the limit here, because we're starting a new // partition. _per_partition_remaining_previous_partition = _per_partition_remaining; _per_partition_remaining = _per_partition_limit; } else { _selectors->reset(); } } void result_set_builder::accept_partition_end() { if (!_selectors->is_aggregate() || _group_by_cell_indices.empty()) { // No need to do anything if we're not aggregating. PER PARTITION LIMIT // is for non-aggregating queries is handled earlier in the process. // If we're aggregating, but not grouping, we don't need to do anything either return; } // We're at the end of a partition OR at the end of the page. We cannot // flush the selectors here, because we might have more rows to process in // the same partition. // _per_partition_remaining_previous_partition is the variable we use to // keep track of the remaining rows in the previous partition, should we // encounter the same partition in the next page. _per_partition_remaining_previous_partition = _per_partition_remaining; _per_partition_remaining = _per_partition_limit; } void result_set_builder::start_new_row() { current.clear(); } std::unique_ptr result_set_builder::build() { if (_selectors->is_aggregate() && _per_partition_remaining_previous_partition > 0) { // We verify _per_partition_remaining_previous_partition here, because // we have finished the last page which means accept_partition_end() has // been called. So the value to check is _per_partition_remaining_previous_partition. if (_group_began || (!_group_by_cell_indices.empty() && last_group_ended())) { flush_selectors(); } } if (_result_set->empty() && _selectors->is_aggregate() && _group_by_cell_indices.empty()) { _result_set->add_row(_selectors->get_output_row()); } return std::move(_result_set); } result_set_builder::restrictions_filter::restrictions_filter(::shared_ptr restrictions, const query_options& options, uint64_t remaining, schema_ptr schema, uint64_t per_partition_limit, std::optional last_pkey, uint64_t rows_fetched_for_last_partition) : _restrictions(restrictions) , _options(options) , _partition_level_filter(_restrictions->get_partition_level_filter()) , _clustering_row_level_filter(_restrictions->get_clustering_row_level_filter()) , _remaining(remaining) , _schema(schema) , _per_partition_limit(per_partition_limit) , _per_partition_remaining(_per_partition_limit) , _rows_fetched_for_last_partition(rows_fetched_for_last_partition) , _last_pkey(std::move(last_pkey)) { } bool result_set_builder::restrictions_filter::do_filter(const selection& selection, const std::vector& partition_key, const std::vector& clustering_key, const query::result_row_view& static_row, const query::result_row_view* row) const { static logging::logger rlogger("restrictions_filter"); if (_current_partition_does_not_match || _remaining == 0 || _per_partition_remaining == 0) { return false; } auto static_and_regular_columns = expr::get_non_pk_values(selection, static_row, row); if (!expr::is_satisfied_by( _partition_level_filter, expr::evaluation_inputs{ .partition_key = partition_key, .clustering_key = clustering_key, .static_and_regular_columns = static_and_regular_columns, .selection = &selection, .options = &_options, })) { _current_partition_does_not_match = true; return false; } if (!expr::is_satisfied_by( _clustering_row_level_filter, expr::evaluation_inputs{ .partition_key = partition_key, .clustering_key = clustering_key, .static_and_regular_columns = static_and_regular_columns, .selection = &selection, .options = &_options, })) { return false; } return true; } bool result_set_builder::restrictions_filter::operator()(const selection& selection, const std::vector& partition_key, const std::vector& clustering_key, const query::result_row_view& static_row, const query::result_row_view* row) const { const bool accepted = do_filter(selection, partition_key, clustering_key, static_row, row); if (!accepted) { ++_rows_dropped; } else { if (_remaining > 0) { --_remaining; } if (_per_partition_remaining > 0) { --_per_partition_remaining; } } return accepted; } void result_set_builder::restrictions_filter::reset(const partition_key* key) { _current_partition_does_not_match = false; _rows_dropped = 0; _per_partition_remaining = _per_partition_limit; if (_is_first_partition_on_page && _per_partition_limit < std::numeric_limits::max()) { // If any rows related to this key were also present in the previous query, // we need to take it into account as well. if (key && _last_pkey && _last_pkey->equal(*_schema, *key)) { _per_partition_remaining -= _rows_fetched_for_last_partition; } _is_first_partition_on_page = false; } } api::timestamp_type result_set_builder::timestamp_of(size_t idx) { return _timestamps[idx]; } int32_t result_set_builder::ttl_of(size_t idx) { return _ttls[idx]; } size_t result_set_builder::result_set_size() const { return _result_set->size(); } bytes_opt result_set_builder::get_value(data_type t, query::result_atomic_cell_view c) { return {c.value().linearize()}; } } }