/* * Copyright (C) 2022-present ScyllaDB * * Modified by ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #include "cql3/statements/strongly_consistent_select_statement.hh" #include #include #include "cql3/restrictions/statement_restrictions.hh" #include "cql3/expr/evaluate.hh" #include "cql3/query_processor.hh" #include "service/broadcast_tables/experimental/lang.hh" #include "db/system_keyspace.hh" namespace cql3 { namespace statements { static logging::logger logger("strongly_consistent_select_statement"); static expr::expression get_key(const cql3::expr::expression& partition_key_restrictions) { const auto* conjunction = cql3::expr::as_if(&partition_key_restrictions); if (!conjunction || conjunction->children.size() != 1) { throw service::broadcast_tables::unsupported_operation_error(fmt::format( "partition key restriction: {}", partition_key_restrictions)); } const auto* key_restriction = cql3::expr::as_if(&conjunction->children[0]); if (!key_restriction) { throw service::broadcast_tables::unsupported_operation_error(fmt::format("partition key restriction: {}", *conjunction)); } const auto* column = cql3::expr::as_if(&key_restriction->lhs); if (!column || column->col->kind != column_kind::partition_key || key_restriction->op != cql3::expr::oper_t::EQ) { throw service::broadcast_tables::unsupported_operation_error(fmt::format("key restriction: {}", *key_restriction)); } return key_restriction->rhs; } static bool is_selecting_only_value(const cql3::selection::selection& selection) { return selection.is_trivial() && selection.get_column_count() == 1 && selection.get_columns()[0]->name() == "value"; } strongly_consistent_select_statement::strongly_consistent_select_statement(schema_ptr schema, uint32_t bound_terms, lw_shared_ptr parameters, ::shared_ptr selection, ::shared_ptr restrictions, ::shared_ptr> group_by_cell_indices, bool is_reversed, ordering_comparator_type ordering_comparator, std::optional limit, std::optional per_partition_limit, cql_stats &stats, std::unique_ptr attrs) : select_statement{schema, bound_terms, parameters, selection, restrictions, group_by_cell_indices, is_reversed, ordering_comparator, std::move(limit), std::move(per_partition_limit), stats, std::move(attrs)}, _query{prepare_query()} { } broadcast_tables::prepared_select strongly_consistent_select_statement::prepare_query() const { if (!is_selecting_only_value(*_selection)) { throw service::broadcast_tables::unsupported_operation_error("only 'value' selector is allowed"); } return { .key = get_key(_restrictions->get_partition_key_restrictions()) }; } static service::broadcast_tables::select_query evaluate_prepared( const broadcast_tables::prepared_select& query, const query_options& options) { return service::broadcast_tables::select_query{ .key = expr::evaluate(query.key, options).to_bytes() }; } future<::shared_ptr> strongly_consistent_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const { if (this_shard_id() != 0) { co_return ::make_shared(0, cql3::computed_function_values{}); } auto result = co_await qp.execute_broadcast_table_query( { evaluate_prepared(_query, options) } ); auto query_result = std::get_if(&result); if (!query_result) { on_internal_error(logger, "incorrect query result "); } auto result_set = std::make_unique(std::vector{ make_lw_shared( db::system_keyspace::NAME, db::system_keyspace::BROADCAST_KV_STORE, ::make_shared("value", true), utf8_type ) }); if (query_result->value) { result_set->add_row({ managed_bytes_opt(query_result->value) }); } co_return ::make_shared(cql3::result{std::move(result_set)}); } } }