Files
scylladb/cql3/statements/broadcast_select_statement.cc
Wojciech Mitros e44820ba1f transport: generalize the bounce result message for bouncing to other nodes
In the following patches, we'll start allowing forwarding requests to strongly
consistent tables so that they'll get executed on the suitable tablet Raft group
members. For that we'll reuse the approach that we already have for bouncing
requests to other shards - we'll try to execute a request locally, and the
result of that will be a bounce message with another replica as the target.
In this patch we generalize the former bounce_to_shard result message so that
it will be able to specify the target of the bounce as another shard or specific
replica.
We also rename it to result_message::bounce so that it stops implying that only
another shard may be its target.
Aside from the host_id and the shard, the new message also includes the timeout,
because in the service handling the forwarding we won't have the access to it,
and it's needed for specifying how long we should wait for the forwarded
requests. It also includes an information whether this is a write request
to return correct timeout response in case the deadline is exceeded.
We will return other hosts in the new bounce message when executing requests to
strongly consistent tables when we can't handle the request because we aren't
a suitable replica. We can't handle this message yet, so we don't return it
anywhere and we still assume that every bounce message is a bounce to the same
host.
2026-03-12 17:48:57 +01:00

131 lines
5.2 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "cql3/statements/broadcast_select_statement.hh"
#include <seastar/core/future.hh>
#include <seastar/core/on_internal_error.hh>
#include "cql3/restrictions/statement_restrictions.hh"
#include "cql3/expr/evaluate.hh"
#include "cql3/query_processor.hh"
#include "service/broadcast_tables/experimental/lang.hh"
#include "db/system_keyspace.hh"
namespace cql3 {
namespace statements {
static logging::logger logger("broadcast_select_statement");
static
expr::expression get_key(const cql3::expr::expression& partition_key_restrictions) {
const auto* conjunction = cql3::expr::as_if<cql3::expr::conjunction>(&partition_key_restrictions);
if (!conjunction || conjunction->children.size() != 1) {
throw service::broadcast_tables::unsupported_operation_error(fmt::format(
"partition key restriction: {}", partition_key_restrictions));
}
const auto* key_restriction = cql3::expr::as_if<cql3::expr::binary_operator>(&conjunction->children[0]);
if (!key_restriction) {
throw service::broadcast_tables::unsupported_operation_error(fmt::format("partition key restriction: {}", *conjunction));
}
const auto* column = cql3::expr::as_if<cql3::expr::column_value>(&key_restriction->lhs);
if (!column || column->col->kind != column_kind::partition_key ||
key_restriction->op != cql3::expr::oper_t::EQ) {
throw service::broadcast_tables::unsupported_operation_error(fmt::format("key restriction: {}", *key_restriction));
}
return key_restriction->rhs;
}
static
bool is_selecting_only_value(const cql3::selection::selection& selection) {
return selection.is_trivial() &&
selection.get_column_count() == 1 &&
selection.get_columns()[0]->name() == "value";
}
broadcast_select_statement::broadcast_select_statement(schema_ptr schema, uint32_t bound_terms,
lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection,
::shared_ptr<const restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
bool is_reversed,
ordering_comparator_type ordering_comparator,
std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit,
cql_stats &stats,
std::unique_ptr<attributes> attrs)
: select_statement{schema, bound_terms, parameters, selection, restrictions, group_by_cell_indices, is_reversed, ordering_comparator, std::move(limit), std::move(per_partition_limit), stats, std::move(attrs)},
_query{prepare_query()}
{ }
broadcast_tables::prepared_select broadcast_select_statement::prepare_query() const {
if (!is_selecting_only_value(*_selection)) {
throw service::broadcast_tables::unsupported_operation_error("only 'value' selector is allowed");
}
return {
.key = get_key(_restrictions->get_partition_key_restrictions())
};
}
static
service::broadcast_tables::select_query
evaluate_prepared(
const broadcast_tables::prepared_select& query,
const query_options& options) {
return service::broadcast_tables::select_query{
.key = expr::evaluate(query.key, options).to_bytes()
};
}
future<::shared_ptr<cql_transport::messages::result_message>>
broadcast_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
if (this_shard_id() != 0) {
co_return qp.bounce_to_shard(0, cql3::computed_function_values{}, false);
}
auto result = co_await qp.execute_broadcast_table_query(
{ evaluate_prepared(_query, options) }
);
auto query_result = std::get_if<service::broadcast_tables::query_result_select>(&result);
if (!query_result) {
on_internal_error(logger, "incorrect query result ");
}
auto result_set = std::make_unique<cql3::result_set>(std::vector{
make_lw_shared<cql3::column_specification>(
db::system_keyspace::NAME,
db::system_keyspace::BROADCAST_KV_STORE,
::make_shared<cql3::column_identifier>("value", true),
utf8_type
)
});
if (query_result->value) {
result_set->add_row({ managed_bytes_opt(query_result->value) });
}
co_return ::make_shared<cql_transport::messages::result_message::rows>(cql3::result{std::move(result_set)});
}
}
}