In the following patches, we'll start allowing forwarding requests to strongly consistent tables so that they'll get executed on the suitable tablet Raft group members. For that we'll reuse the approach that we already have for bouncing requests to other shards - we'll try to execute a request locally, and the result of that will be a bounce message with another replica as the target. In this patch we generalize the former bounce_to_shard result message so that it will be able to specify the target of the bounce as another shard or specific replica. We also rename it to result_message::bounce so that it stops implying that only another shard may be its target. Aside from the host_id and the shard, the new message also includes the timeout, because in the service handling the forwarding we won't have the access to it, and it's needed for specifying how long we should wait for the forwarded requests. It also includes an information whether this is a write request to return correct timeout response in case the deadline is exceeded. We will return other hosts in the new bounce message when executing requests to strongly consistent tables when we can't handle the request because we aren't a suitable replica. We can't handle this message yet, so we don't return it anywhere and we still assume that every bounce message is a bounce to the same host.
131 lines
5.2 KiB
C++
131 lines
5.2 KiB
C++
/*
|
|
* Copyright (C) 2022-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
|
|
#include "cql3/statements/broadcast_select_statement.hh"
|
|
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/on_internal_error.hh>
|
|
|
|
#include "cql3/restrictions/statement_restrictions.hh"
|
|
#include "cql3/expr/evaluate.hh"
|
|
#include "cql3/query_processor.hh"
|
|
#include "service/broadcast_tables/experimental/lang.hh"
|
|
#include "db/system_keyspace.hh"
|
|
|
|
namespace cql3 {
|
|
|
|
namespace statements {
|
|
|
|
static logging::logger logger("broadcast_select_statement");
|
|
|
|
static
|
|
expr::expression get_key(const cql3::expr::expression& partition_key_restrictions) {
|
|
const auto* conjunction = cql3::expr::as_if<cql3::expr::conjunction>(&partition_key_restrictions);
|
|
|
|
if (!conjunction || conjunction->children.size() != 1) {
|
|
throw service::broadcast_tables::unsupported_operation_error(fmt::format(
|
|
"partition key restriction: {}", partition_key_restrictions));
|
|
}
|
|
|
|
const auto* key_restriction = cql3::expr::as_if<cql3::expr::binary_operator>(&conjunction->children[0]);
|
|
|
|
if (!key_restriction) {
|
|
throw service::broadcast_tables::unsupported_operation_error(fmt::format("partition key restriction: {}", *conjunction));
|
|
}
|
|
|
|
const auto* column = cql3::expr::as_if<cql3::expr::column_value>(&key_restriction->lhs);
|
|
|
|
if (!column || column->col->kind != column_kind::partition_key ||
|
|
key_restriction->op != cql3::expr::oper_t::EQ) {
|
|
throw service::broadcast_tables::unsupported_operation_error(fmt::format("key restriction: {}", *key_restriction));
|
|
}
|
|
|
|
return key_restriction->rhs;
|
|
}
|
|
|
|
static
|
|
bool is_selecting_only_value(const cql3::selection::selection& selection) {
|
|
return selection.is_trivial() &&
|
|
selection.get_column_count() == 1 &&
|
|
selection.get_columns()[0]->name() == "value";
|
|
}
|
|
|
|
broadcast_select_statement::broadcast_select_statement(schema_ptr schema, uint32_t bound_terms,
|
|
lw_shared_ptr<const parameters> parameters,
|
|
::shared_ptr<selection::selection> selection,
|
|
::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
|
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
|
|
bool is_reversed,
|
|
ordering_comparator_type ordering_comparator,
|
|
std::optional<expr::expression> limit,
|
|
std::optional<expr::expression> per_partition_limit,
|
|
cql_stats &stats,
|
|
std::unique_ptr<attributes> attrs)
|
|
: select_statement{schema, bound_terms, parameters, selection, restrictions, group_by_cell_indices, is_reversed, ordering_comparator, std::move(limit), std::move(per_partition_limit), stats, std::move(attrs)},
|
|
_query{prepare_query()}
|
|
{ }
|
|
|
|
broadcast_tables::prepared_select broadcast_select_statement::prepare_query() const {
|
|
if (!is_selecting_only_value(*_selection)) {
|
|
throw service::broadcast_tables::unsupported_operation_error("only 'value' selector is allowed");
|
|
}
|
|
|
|
return {
|
|
.key = get_key(_restrictions->get_partition_key_restrictions())
|
|
};
|
|
}
|
|
|
|
static
|
|
service::broadcast_tables::select_query
|
|
evaluate_prepared(
|
|
const broadcast_tables::prepared_select& query,
|
|
const query_options& options) {
|
|
return service::broadcast_tables::select_query{
|
|
.key = expr::evaluate(query.key, options).to_bytes()
|
|
};
|
|
}
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
broadcast_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
|
if (this_shard_id() != 0) {
|
|
co_return qp.bounce_to_shard(0, cql3::computed_function_values{}, false);
|
|
}
|
|
|
|
auto result = co_await qp.execute_broadcast_table_query(
|
|
{ evaluate_prepared(_query, options) }
|
|
);
|
|
|
|
auto query_result = std::get_if<service::broadcast_tables::query_result_select>(&result);
|
|
|
|
if (!query_result) {
|
|
on_internal_error(logger, "incorrect query result ");
|
|
}
|
|
|
|
auto result_set = std::make_unique<cql3::result_set>(std::vector{
|
|
make_lw_shared<cql3::column_specification>(
|
|
db::system_keyspace::NAME,
|
|
db::system_keyspace::BROADCAST_KV_STORE,
|
|
::make_shared<cql3::column_identifier>("value", true),
|
|
utf8_type
|
|
)
|
|
});
|
|
|
|
if (query_result->value) {
|
|
result_set->add_row({ managed_bytes_opt(query_result->value) });
|
|
}
|
|
|
|
co_return ::make_shared<cql_transport::messages::result_message::rows>(cql3::result{std::move(result_set)});
|
|
}
|
|
|
|
}
|
|
|
|
}
|