In the following patches, we'll start allowing forwarding requests to strongly consistent tables so that they'll get executed on the suitable tablet Raft group members. For that we'll reuse the approach that we already have for bouncing requests to other shards - we'll try to execute a request locally, and the result of that will be a bounce message with another replica as the target. In this patch we generalize the former bounce_to_shard result message so that it will be able to specify the target of the bounce as another shard or specific replica. We also rename it to result_message::bounce so that it stops implying that only another shard may be its target. Aside from the host_id and the shard, the new message also includes the timeout, because in the service handling the forwarding we won't have the access to it, and it's needed for specifying how long we should wait for the forwarded requests. It also includes an information whether this is a write request to return correct timeout response in case the deadline is exceeded. We will return other hosts in the new bounce message when executing requests to strongly consistent tables when we can't handle the request because we aren't a suitable replica. We can't handle this message yet, so we don't return it anywhere and we still assume that every bounce message is a bounce to the same host.
128 lines
4.9 KiB
C++
128 lines
4.9 KiB
C++
/*
|
|
* Copyright (C) 2022-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
|
|
#include "cql3/statements/broadcast_modification_statement.hh"
|
|
|
|
#include <optional>
|
|
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/util/variant_utils.hh>
|
|
|
|
#include "bytes.hh"
|
|
#include "cql3/attributes.hh"
|
|
#include "cql3/expr/expression.hh"
|
|
#include "cql3/expr/evaluate.hh"
|
|
#include "cql3/query_options.hh"
|
|
#include "cql3/query_processor.hh"
|
|
#include "cql3/values.hh"
|
|
#include "timeout_config.hh"
|
|
#include "service/broadcast_tables/experimental/lang.hh"
|
|
#include "db/system_keyspace.hh"
|
|
|
|
namespace cql3 {
|
|
|
|
static logging::logger logger("broadcast_modification_statement");
|
|
|
|
namespace statements {
|
|
|
|
broadcast_modification_statement::broadcast_modification_statement(
|
|
uint32_t bound_terms,
|
|
schema_ptr schema,
|
|
broadcast_tables::prepared_update query)
|
|
: cql_statement_opt_metadata{&timeout_config::write_timeout}
|
|
, _bound_terms{bound_terms}
|
|
, _schema{schema}
|
|
, _query{std::move(query)}
|
|
{ }
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
broadcast_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
|
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
|
|
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
|
}
|
|
|
|
static
|
|
service::broadcast_tables::update_query
|
|
evaluate_prepared(
|
|
const broadcast_tables::prepared_update& query,
|
|
const query_options& options) {
|
|
return service::broadcast_tables::update_query{
|
|
.key = expr::evaluate(query.key, options).to_bytes(),
|
|
.new_value = expr::evaluate(query.new_value, options).to_bytes(),
|
|
.value_condition = query.value_condition
|
|
? std::optional<bytes_opt>{expr::evaluate(*query.value_condition, options).to_bytes_opt()}
|
|
: std::nullopt
|
|
};
|
|
}
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
broadcast_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
|
if (this_shard_id() != 0) {
|
|
co_return qp.bounce_to_shard(0, cql3::computed_function_values{}, false);
|
|
}
|
|
|
|
auto result = co_await qp.execute_broadcast_table_query(
|
|
{ evaluate_prepared(_query, options) }
|
|
);
|
|
|
|
co_return co_await std::visit(make_visitor(
|
|
[] (service::broadcast_tables::query_result_conditional_update& qr) -> future<::shared_ptr<cql_transport::messages::result_message>> {
|
|
auto result_set = std::make_unique<cql3::result_set>(std::vector{
|
|
make_lw_shared<cql3::column_specification>(
|
|
db::system_keyspace::NAME,
|
|
db::system_keyspace::BROADCAST_KV_STORE,
|
|
::make_shared<cql3::column_identifier>("[applied]", false),
|
|
boolean_type
|
|
),
|
|
make_lw_shared<cql3::column_specification>(
|
|
db::system_keyspace::NAME,
|
|
db::system_keyspace::BROADCAST_KV_STORE,
|
|
::make_shared<cql3::column_identifier>("value", true),
|
|
utf8_type
|
|
)
|
|
});
|
|
|
|
result_set->add_row({ boolean_type->decompose(qr.is_applied), qr.previous_value });
|
|
|
|
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(
|
|
::make_shared<cql_transport::messages::result_message::rows>(cql3::result{std::move(result_set)}));
|
|
},
|
|
[] (service::broadcast_tables::query_result_none&) -> future<::shared_ptr<cql_transport::messages::result_message>> {
|
|
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
|
|
},
|
|
[] (service::broadcast_tables::query_result_select&) -> future<::shared_ptr<cql_transport::messages::result_message>> {
|
|
on_internal_error(logger, "incorrect query result ");
|
|
}
|
|
), result);
|
|
}
|
|
|
|
uint32_t broadcast_modification_statement::get_bound_terms() const {
|
|
return _bound_terms;
|
|
}
|
|
|
|
future<> broadcast_modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
|
|
auto f = state.has_column_family_access(_schema->ks_name(), _schema->cf_name(), auth::permission::MODIFY);
|
|
if (_query.value_condition.has_value()) {
|
|
f = f.then([this, &state] {
|
|
return state.has_column_family_access(_schema->ks_name(), _schema->cf_name(), auth::permission::SELECT);
|
|
});
|
|
}
|
|
return f;
|
|
}
|
|
|
|
bool broadcast_modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
|
return _schema->ks_name() == ks_name && (!cf_name || _schema->cf_name() == *cf_name);
|
|
}
|
|
|
|
}
|
|
|
|
}
|