Files
scylladb/cql3/statements/broadcast_modification_statement.cc
Wojciech Mitros e44820ba1f transport: generalize the bounce result message for bouncing to other nodes
In the following patches, we'll start allowing forwarding requests to strongly
consistent tables so that they'll get executed on the suitable tablet Raft group
members. For that we'll reuse the approach that we already have for bouncing
requests to other shards - we'll try to execute a request locally, and the
result of that will be a bounce message with another replica as the target.
In this patch we generalize the former bounce_to_shard result message so that
it will be able to specify the target of the bounce as another shard or specific
replica.
We also rename it to result_message::bounce so that it stops implying that only
another shard may be its target.
Aside from the host_id and the shard, the new message also includes the timeout,
because in the service handling the forwarding we won't have the access to it,
and it's needed for specifying how long we should wait for the forwarded
requests. It also includes an information whether this is a write request
to return correct timeout response in case the deadline is exceeded.
We will return other hosts in the new bounce message when executing requests to
strongly consistent tables when we can't handle the request because we aren't
a suitable replica. We can't handle this message yet, so we don't return it
anywhere and we still assume that every bounce message is a bounce to the same
host.
2026-03-12 17:48:57 +01:00

128 lines
4.9 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "cql3/statements/broadcast_modification_statement.hh"
#include <optional>
#include <seastar/core/future.hh>
#include <seastar/util/variant_utils.hh>
#include "bytes.hh"
#include "cql3/attributes.hh"
#include "cql3/expr/expression.hh"
#include "cql3/expr/evaluate.hh"
#include "cql3/query_options.hh"
#include "cql3/query_processor.hh"
#include "cql3/values.hh"
#include "timeout_config.hh"
#include "service/broadcast_tables/experimental/lang.hh"
#include "db/system_keyspace.hh"
namespace cql3 {
static logging::logger logger("broadcast_modification_statement");
namespace statements {
broadcast_modification_statement::broadcast_modification_statement(
uint32_t bound_terms,
schema_ptr schema,
broadcast_tables::prepared_update query)
: cql_statement_opt_metadata{&timeout_config::write_timeout}
, _bound_terms{bound_terms}
, _schema{schema}
, _query{std::move(query)}
{ }
future<::shared_ptr<cql_transport::messages::result_message>>
broadcast_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
}
static
service::broadcast_tables::update_query
evaluate_prepared(
const broadcast_tables::prepared_update& query,
const query_options& options) {
return service::broadcast_tables::update_query{
.key = expr::evaluate(query.key, options).to_bytes(),
.new_value = expr::evaluate(query.new_value, options).to_bytes(),
.value_condition = query.value_condition
? std::optional<bytes_opt>{expr::evaluate(*query.value_condition, options).to_bytes_opt()}
: std::nullopt
};
}
future<::shared_ptr<cql_transport::messages::result_message>>
broadcast_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
if (this_shard_id() != 0) {
co_return qp.bounce_to_shard(0, cql3::computed_function_values{}, false);
}
auto result = co_await qp.execute_broadcast_table_query(
{ evaluate_prepared(_query, options) }
);
co_return co_await std::visit(make_visitor(
[] (service::broadcast_tables::query_result_conditional_update& qr) -> future<::shared_ptr<cql_transport::messages::result_message>> {
auto result_set = std::make_unique<cql3::result_set>(std::vector{
make_lw_shared<cql3::column_specification>(
db::system_keyspace::NAME,
db::system_keyspace::BROADCAST_KV_STORE,
::make_shared<cql3::column_identifier>("[applied]", false),
boolean_type
),
make_lw_shared<cql3::column_specification>(
db::system_keyspace::NAME,
db::system_keyspace::BROADCAST_KV_STORE,
::make_shared<cql3::column_identifier>("value", true),
utf8_type
)
});
result_set->add_row({ boolean_type->decompose(qr.is_applied), qr.previous_value });
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(
::make_shared<cql_transport::messages::result_message::rows>(cql3::result{std::move(result_set)}));
},
[] (service::broadcast_tables::query_result_none&) -> future<::shared_ptr<cql_transport::messages::result_message>> {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
},
[] (service::broadcast_tables::query_result_select&) -> future<::shared_ptr<cql_transport::messages::result_message>> {
on_internal_error(logger, "incorrect query result ");
}
), result);
}
uint32_t broadcast_modification_statement::get_bound_terms() const {
return _bound_terms;
}
future<> broadcast_modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
auto f = state.has_column_family_access(_schema->ks_name(), _schema->cf_name(), auth::permission::MODIFY);
if (_query.value_condition.has_value()) {
f = f.then([this, &state] {
return state.has_column_family_access(_schema->ks_name(), _schema->cf_name(), auth::permission::SELECT);
});
}
return f;
}
bool broadcast_modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return _schema->ks_name() == ks_name && (!cf_name || _schema->cf_name() == *cf_name);
}
}
}