Files
scylladb/cql3/statements/modification_statement.cc
Petr Gusev c428645d16 storage_proxy: cas: take cas_request by raw reference
In the next commit we want to add an optimization that relies on
precise control over the lifetime of cas_request. In particular, we
want the implementation of this interface in Alternator to operate on
raw references that are guaranteed to remain valid only until the
cas() future is resolved. We already depend on the same lifetime
assumptions in cas_request when used by modification_statement.
However, these assumptions are not clearly expressed in the current
interface: cas_request is taken by shared_ptr, and nothing prevents
cas() from storing that pointer inside paxos_response_handler, which
may outlive the cas() future.

This commit fixes that by taking cas_request by raw reference. This
makes it explicit that cas() does not assume ownership of the object.
Callers must ensure that the referenced object remains valid until
the returned future is resolved.
2025-12-07 16:14:56 +01:00

853 lines
38 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "utils/assert.hh"
#include "cql3/cql_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "cql3/statements/strongly_consistent_modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/expr/expr-utils.hh"
#include "cql3/expr/evaluate.hh"
#include "cql3/util.hh"
#include "validation.hh"
#include "db/consistency_level_validations.hh"
#include <optional>
#include <seastar/core/shared_ptr.hh>
#include "transport/messages/result_message.hh"
#include "data_dictionary/data_dictionary.hh"
#include "replica/database.hh"
#include <seastar/core/execution_stage.hh>
#include "cas_request.hh"
#include "cql3/query_processor.hh"
#include "service/storage_proxy.hh"
#include "service/broadcast_tables/experimental/lang.hh"
#include <boost/lexical_cast.hpp>
template<typename T = void>
using coordinator_result = exceptions::coordinator_result<T>;
bool is_internal_keyspace(std::string_view name);
namespace cql3 {
namespace statements {
timeout_config_selector
modification_statement_timeout(const schema& s) {
if (s.is_counter()) {
return &timeout_config::counter_write_timeout;
} else {
return &timeout_config::write_timeout;
}
}
db::timeout_clock::duration modification_statement::get_timeout(const service::client_state& state, const query_options& options) const {
return attrs->is_timeout_set() ? attrs->get_timeout(options) : state.get_timeout_config().*get_timeout_config_selector();
}
modification_statement::modification_statement(statement_type type_, uint32_t bound_terms, schema_ptr schema_, std::unique_ptr<attributes> attrs_, cql_stats& stats_)
: cql_statement_opt_metadata(modification_statement_timeout(*schema_))
, type{type_}
, _bound_terms{bound_terms}
, _columns_to_read(schema_->all_columns_count())
, _columns_of_cas_result_set(schema_->all_columns_count())
, s{schema_}
, attrs{std::move(attrs_)}
, _column_operations{}
, _stats(stats_)
, _ks_sel(::is_internal_keyspace(schema_->ks_name()) ? ks_selector::SYSTEM : ks_selector::NONSYSTEM)
{ }
modification_statement::~modification_statement() = default;
uint32_t modification_statement::get_bound_terms() const {
return _bound_terms;
}
const sstring& modification_statement::keyspace() const {
return s->ks_name();
}
const sstring& modification_statement::column_family() const {
return s->cf_name();
}
bool modification_statement::is_counter() const {
return s->is_counter();
}
bool modification_statement::is_view() const {
return s->is_view();
}
int64_t modification_statement::get_timestamp(int64_t now, const query_options& options) const {
return attrs->get_timestamp(now, options);
}
bool modification_statement::is_timestamp_set() const {
return attrs->is_timestamp_set();
}
std::optional<gc_clock::duration> modification_statement::get_time_to_live(const query_options& options) const {
std::optional<int32_t> ttl = attrs->get_time_to_live(options);
return ttl ? std::make_optional<gc_clock::duration>(*ttl) : std::nullopt;
}
future<> modification_statement::check_access(query_processor& qp, const service::client_state& state) const {
auto f = state.has_column_family_access(keyspace(), column_family(), auth::permission::MODIFY);
if (has_conditions()) {
f = f.then([this, &state] {
return state.has_column_family_access(keyspace(), column_family(), auth::permission::SELECT);
});
}
return f;
}
future<utils::chunked_vector<mutation>>
modification_statement::get_mutations(query_processor& qp, const query_options& options, db::timeout_clock::time_point timeout, bool local, int64_t now, service::query_state& qs, json_cache_opt& json_cache, std::vector<dht::partition_range> keys) const {
auto cl = options.get_consistency();
auto ranges = create_clustering_ranges(options, json_cache);
auto f = make_ready_future<update_parameters::prefetch_data>(s);
if (is_counter()) {
db::validate_counter_for_write(*s, cl);
} else {
db::validate_for_write(cl);
}
if (requires_read()) {
lw_shared_ptr<query::read_command> cmd = read_command(qp, ranges, cl);
// FIXME: ignoring "local"
f = qp.proxy().query(s, cmd, dht::partition_range_vector(keys), cl,
{timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()}).then(
[this, cmd] (auto cqr) {
return update_parameters::build_prefetch_data(s, *cqr.query_result, cmd->slice);
});
}
return f.then([this, keys = std::move(keys), ranges = std::move(ranges), json_cache = std::move(json_cache), &options, now]
(auto rows) {
update_parameters params(s, options, this->get_timestamp(now, options),
this->get_time_to_live(options), std::move(rows));
utils::chunked_vector<mutation> mutations = apply_updates(keys, ranges, params, json_cache);
return make_ready_future<utils::chunked_vector<mutation>>(std::move(mutations));
});
}
bool modification_statement::applies_to(const selection::selection* selection,
const update_parameters::prefetch_data::row* row,
const query_options& options) const {
// Assume the row doesn't exist if it has no static columns and the statement is only interested
// in static column values. Needed for EXISTS checks to work correctly. For example, the following
// conditional INSERT must apply, because there's no static row in the partition although there's
// a regular row, which is fetched by the read:
// CREATE TABLE t(p int, c int, s int static, PRIMARY KEY(p, c));
// INSERT INTO t(p, c) VALUES(1, 1);
// INSERT INTO t(p, s) VALUES(1, 1) IF NOT EXISTS;
if (has_only_static_column_conditions() && row && !row->has_static_columns(*s)) {
row = nullptr;
}
if (_if_exists) {
return row != nullptr;
}
if (_if_not_exists) {
return row == nullptr;
}
// Fake out an all-null static_and_regular_columns if we didn't find a row
auto fake_static_and_regular_columns = std::vector<managed_bytes_opt>();
auto static_and_regular_columns = std::invoke([&] () -> const std::vector<managed_bytes_opt>* {
if (row) {
return &row->cells;
} else {
fake_static_and_regular_columns.resize(selection->get_column_count());
return &fake_static_and_regular_columns;
}
});
auto inputs = expr::evaluation_inputs{
.static_and_regular_columns = *static_and_regular_columns,
.selection = selection,
.options = &options,
};
static auto true_value = raw_value::make_value(data_value(true).serialize());
return expr::evaluate(_condition, inputs) == true_value;
}
utils::chunked_vector<mutation> modification_statement::apply_updates(
const std::vector<dht::partition_range>& keys,
const std::vector<query::clustering_range>& ranges,
const update_parameters& params,
const json_cache_opt& json_cache) const {
utils::chunked_vector<mutation> mutations;
mutations.reserve(keys.size());
for (auto key : keys) {
// We know key.start() must be defined since we only allow EQ relations on the partition key.
mutations.emplace_back(s, std::move(*key.start()->value().key()));
auto& m = mutations.back();
for (auto&& r : ranges) {
this->add_update_for_key(m, r, params, json_cache);
}
}
return mutations;
}
lw_shared_ptr<query::read_command>
modification_statement::read_command(query_processor& qp, query::clustering_row_ranges ranges, db::consistency_level cl) const {
try {
validate_for_read(cl);
} catch (exceptions::invalid_request_exception& e) {
throw exceptions::invalid_request_exception(format("Write operation require a read but consistency {} is not supported on reads", cl));
}
query::partition_slice ps(std::move(ranges), *s, columns_to_read(), update_parameters::options);
const auto max_result_size = qp.proxy().get_max_result_size(ps);
return make_lw_shared<query::read_command>(s->id(), s->version(), std::move(ps), query::max_result_size(max_result_size), query::tombstone_limit::max);
}
std::vector<query::clustering_range>
modification_statement::create_clustering_ranges(const query_options& options, const json_cache_opt& json_cache) const {
return _restrictions->get_clustering_bounds(options);
}
dht::partition_range_vector
modification_statement::build_partition_keys(const query_options& options, const json_cache_opt& json_cache) const {
auto keys = _restrictions->get_partition_key_ranges(options);
for (auto const& k : keys) {
validation::validate_cql_key(*s, *k.start()->value().key());
}
return keys;
}
struct modification_statement_executor {
static auto get() { return &modification_statement::do_execute; }
};
static thread_local inheriting_concrete_execution_stage<
future<::shared_ptr<cql_transport::messages::result_message>>,
const modification_statement*,
query_processor&,
service::query_state&,
const query_options&> modify_stage{"cql3_modification", modification_statement_executor::get()};
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
}
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
cql3::util::validate_timestamp(qp.db().get_config(), options, attrs);
return modify_stage(this, seastar::ref(qp), seastar::ref(qs), seastar::cref(options));
}
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::do_execute(query_processor& qp, service::query_state& qs, const query_options& options) const {
(void)validation::validate_column_family(qp.db(), keyspace(), column_family());
tracing::add_table_name(qs.get_trace_state(), keyspace(), column_family());
inc_cql_stats(qs.get_client_state().is_internal());
_restrictions->validate_primary_key(options);
if (has_conditions()) {
co_return co_await execute_with_condition(qp, qs, options);
}
json_cache_opt json_cache = maybe_prepare_json_cache(options);
std::vector<dht::partition_range> keys = build_partition_keys(options, json_cache);
bool keys_size_one = keys.size() == 1;
auto token = dht::token();
if (keys_size_one) {
token = keys[0].start()->value().token();
}
auto res = co_await execute_without_condition(qp, qs, options, json_cache, std::move(keys));
if (!res) {
co_return seastar::make_shared<cql_transport::messages::result_message::exception>(std::move(res).assume_error());
}
auto result = seastar::make_shared<cql_transport::messages::result_message::void_message>();
if (keys_size_one) {
auto&& table = s->table();
if (_may_use_token_aware_routing && table.uses_tablets() && qs.get_client_state().is_protocol_extension_set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1)) {
auto erm = table.get_effective_replication_map();
auto tablet_info = erm->check_locality(token);
if (tablet_info.has_value()) {
result->add_tablet_info(tablet_info->tablet_replicas, tablet_info->token_range);
}
}
}
co_return std::move(result);
}
future<coordinator_result<>>
modification_statement::execute_without_condition(query_processor& qp, service::query_state& qs, const query_options& options, json_cache_opt& json_cache, std::vector<dht::partition_range> keys) const {
auto cl = options.get_consistency();
auto timeout = db::timeout_clock::now() + get_timeout(qs.get_client_state(), options);
return get_mutations(qp, options, timeout, false, options.get_timestamp(qs), qs, json_cache, std::move(keys)).then([this, cl, timeout, &qp, &qs, &options] (auto mutations) {
if (mutations.empty()) {
return make_ready_future<coordinator_result<>>(bo::success());
}
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, false, qs.get_trace_state(), qs.get_permit(), db::allow_per_partition_rate_limit::yes, this->is_raw_counter_shard_write(), {
.node_local_only = options.get_specific_options().node_local_only
});
});
}
namespace {
future<::shared_ptr<cql_transport::messages::result_message>>
process_forced_rebounce(unsigned shard, query_processor& qp, const query_options& options) {
static int64_t counter = {0};
static logging::logger logger("modification_statement");
if (counter <= 0) {
const auto counter_opt = utils::get_local_injector().inject_parameter<decltype(counter)>("forced_bounce_to_shard_counter");
decltype(counter) counter_value = 0;
if (!counter_opt) {
logger.warn("forced_bounce_to_shard_counter is not set. Using default value 1.");
} else {
try {
counter_value = boost::lexical_cast<decltype(counter_value)>(*counter_opt);
} catch (const boost::bad_lexical_cast& e) {
logger.warn("Incorrect forced_bounce_to_shard_counter value: [{}]. Using default value 1.", *counter_opt);
}
}
if (counter_value <= 0) {
counter_value = 1;
}
counter = counter_value;
}
const auto prev_counter_value = counter;
if (prev_counter_value <= 1) {
logger.info("Disabling forced_bounce_to_shard_counter.");
co_await utils::error_injection_type::disable_on_all("forced_bounce_to_shard_counter");
counter = 0;
} else {
--counter;
}
// While counter > 1 select a different shard to re-bounce to.
// On the last iteration, re-bounce to the correct shard.
if (counter != 0) {
const auto shard_num = smp::count;
assert(shard_num > 0);
const auto local_shard = this_shard_id();
auto target_shard = local_shard + 1;
if (target_shard == shard) {
++target_shard;
}
if (target_shard > shard_num - 1) {
target_shard = 0;
}
shard = target_shard;
}
logger.info("Applying forced_bounce_to_shard_counter, re-bouncing to shard {}.", shard);
co_return co_await make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
qp.bounce_to_shard(shard, std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls())));
}
} // namespace
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::execute_with_condition(query_processor& qp, service::query_state& qs, const query_options& options) const {
auto cl_for_learn = options.get_consistency();
utils::result_with_exception_ptr<db::consistency_level> cl_for_paxos = options.check_serial_consistency();
if (!cl_for_paxos) [[unlikely]] {
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(std::move(cl_for_paxos).assume_error());
}
db::timeout_clock::time_point now = db::timeout_clock::now();
const timeout_config& cfg = qs.get_client_state().get_timeout_config();
auto statement_timeout = now + cfg.write_timeout; // All CAS networking operations run with write timeout.
auto cas_timeout = now + cfg.cas_timeout; // When to give up due to contention.
auto read_timeout = now + cfg.read_timeout; // When to give up on query.
json_cache_opt json_cache = maybe_prepare_json_cache(options);
std::vector<dht::partition_range> keys = build_partition_keys(options, json_cache);
std::vector<query::clustering_range> ranges = create_clustering_ranges(options, json_cache);
if (keys.empty()) {
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional {}",
type.is_update() ? "update" : "deletion"));
}
if (ranges.empty()) {
throw exceptions::invalid_request_exception(format("Unrestricted clustering key in a conditional {}",
type.is_update() ? "update" : "deletion"));
}
auto request = std::make_unique<cas_request>(s, std::move(keys));
auto* request_ptr = request.get();
// cas_request can be used for batches as well single statements; Here we have just a single
// modification in the list of CAS commands, since we're handling single-statement execution.
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
auto token = request->key()[0].start()->value().as_decorated_key().token();
auto cas_shard = service::cas_shard(*s, token);
if (utils::get_local_injector().is_enabled("forced_bounce_to_shard_counter")) {
return process_forced_rebounce(cas_shard.shard(), qp, options);
}
if (!cas_shard.this_shard()) {
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
qp.bounce_to_shard(cas_shard.shard(), std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()))
);
}
std::optional<locator::tablet_routing_info> tablet_info = locator::tablet_routing_info{locator::tablet_replica_set(), std::pair<dht::token, dht::token>()};
auto&& table = s->table();
if (_may_use_token_aware_routing && table.uses_tablets() && qs.get_client_state().is_protocol_extension_set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1)) {
auto erm = table.get_effective_replication_map();
tablet_info = erm->check_locality(token);
}
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
result->add_tablet_info(tablet_replicas, token_range);
return result;
});
}
void modification_statement::build_cas_result_set_metadata() {
std::vector<lw_shared_ptr<column_specification>> columns;
// Add the mandatory [applied] column to result set metadata
auto applied = make_lw_shared<cql3::column_specification>(s->ks_name(), s->cf_name(),
make_shared<cql3::column_identifier>("[applied]", false), boolean_type);
columns.push_back(applied);
const auto& all_columns = s->all_columns();
if (_if_exists || _if_not_exists) {
// If all our conditions are columns conditions (IF x = ?), then it's enough to query
// the columns from the conditions. If we have a IF EXISTS or IF NOT EXISTS however,
// we need to query all columns for the row since if the condition fails, we want to
// return everything to the user.
// XXX Static columns make this a bit more complex, in that if an insert only static
// columns, then the existence condition applies only to the static columns themselves, and
// so we don't want to include regular columns in that case.
for (const auto& def : all_columns) {
_columns_of_cas_result_set.set(def.ordinal_id);
}
} else {
expr::for_each_expression<expr::column_value>(_condition, [&] (const expr::column_value& col) {
_columns_of_cas_result_set.set(col.col->ordinal_id);
});
}
columns.reserve(columns.size() + all_columns.size());
// We must filter conditions using the _columns_of_cas_result_set, since
// the same column can be used twice in the condition list:
// if a > 0 and a < 3.
for (const auto& def : all_columns) {
if (_columns_of_cas_result_set.test(def.ordinal_id)) {
columns.emplace_back(def.column_specification);
}
}
// Ensure we prefetch all of the columns of the result set. This is also
// necessary to check conditions.
_columns_to_read.union_with(_columns_of_cas_result_set);
_metadata = seastar::make_shared<cql3::metadata>(std::move(columns));
}
void
modification_statement::process_where_clause(data_dictionary::database db, expr::expression where_clause, prepare_context& ctx) {
_restrictions = restrictions::analyze_statement_restrictions(db, s, type, where_clause, ctx,
applies_only_to_static_columns(), _selects_a_collection, false /* allow_filtering */, restrictions::check_indexes::no);
/*
* If there's no clustering columns restriction, we may assume that EXISTS
* check only selects static columns and hence we can use any row from the
* partition to check conditions.
*/
if (_if_exists || _if_not_exists) {
SCYLLA_ASSERT(!_has_static_column_conditions && !_has_regular_column_conditions);
if (s->has_static_columns() && !_restrictions->has_clustering_columns_restriction()) {
_has_static_column_conditions = true;
} else {
_has_regular_column_conditions = true;
}
}
if (_restrictions->has_token_restrictions()) {
throw exceptions::invalid_request_exception(format("The token function cannot be used in WHERE clauses for UPDATE and DELETE statements: {}",
to_string(_restrictions->get_partition_key_restrictions())));
}
if (!_restrictions->get_non_pk_restriction().empty()) {
throw exceptions::invalid_request_exception(seastar::format("Invalid where clause contains non PRIMARY KEY columns: {}",
fmt::join(_restrictions->get_non_pk_restriction()
| std::views::keys
| std::views::transform([](const column_definition* c) {
return c->name_as_text();
}), ", ")));
}
const expr::expression& ck_restrictions = _restrictions->get_clustering_columns_restrictions();
if (has_slice(ck_restrictions) && !allow_clustering_key_slices()) {
throw exceptions::invalid_request_exception(
format("Invalid operator in where clause {}", to_string(ck_restrictions)));
}
if (_restrictions->has_unrestricted_clustering_columns() && !applies_only_to_static_columns() && !s->is_dense()) {
// Tomek: Origin had "&& s->comparator->is_composite()" in the condition below.
// Comparator is a thrift concept, not CQL concept, and we want to avoid
// using thrift concepts here. I think it's safe to drop this here because the only
// case in which we would get a non-composite comparator here would be if the cell
// name type is SimpleSparse, which means:
// (a) CQL compact table without clustering columns
// (b) thrift static CF with non-composite comparator
// Those tables don't have clustering columns so we wouldn't reach this code, thus
// the check seems redundant.
if (require_full_clustering_key()) {
throw exceptions::invalid_request_exception(format("Missing mandatory PRIMARY KEY part {}",
_restrictions->unrestricted_column(column_kind::clustering_key).name_as_text()));
}
// In general, we can't modify specific columns if not all clustering columns have been specified.
// However, if we modify only static columns, it's fine since we won't really use the prefix anyway.
if (!has_slice(ck_restrictions)) {
for (auto&& op : _column_operations) {
if (!op->column.is_static()) {
throw exceptions::invalid_request_exception(format("Primary key column '{}' must be specified in order to modify column '{}'",
_restrictions->unrestricted_column(column_kind::clustering_key).name_as_text(), op->column.name_as_text()));
}
}
}
}
if (_restrictions->has_partition_key_unrestricted_components()) {
throw exceptions::invalid_request_exception(format("Missing mandatory PRIMARY KEY part {}",
_restrictions->unrestricted_column(column_kind::partition_key).name_as_text()));
}
if (has_conditions()) {
validate_where_clause_for_conditions();
}
}
::shared_ptr<strongly_consistent_modification_statement>
modification_statement::prepare_for_broadcast_tables() const {
// FIXME: implement for every type of `modification_statement`.
throw service::broadcast_tables::unsupported_operation_error{};
}
namespace raw {
::shared_ptr<cql_statement_opt_metadata>
modification_statement::prepare_statement(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) {
::shared_ptr<cql3::statements::modification_statement> statement = prepare(db, ctx, stats);
if (service::broadcast_tables::is_broadcast_table_statement(keyspace(), column_family())) {
return statement->prepare_for_broadcast_tables();
} else {
return statement;
}
}
std::unique_ptr<prepared_statement>
modification_statement::prepare(data_dictionary::database db, cql_stats& stats) {
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
auto meta = get_prepare_context();
auto statement = prepare_statement(db, meta, stats);
auto partition_key_bind_indices = meta.get_partition_key_bind_indexes(*schema);
return std::make_unique<prepared_statement>(audit_info(), std::move(statement), meta, std::move(partition_key_bind_indices));
}
::shared_ptr<cql3::statements::modification_statement>
modification_statement::prepare(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) const {
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
auto prepared_attributes = _attrs->prepare(db, keyspace(), column_family());
prepared_attributes->fill_prepare_context(ctx);
auto prepared_stmt = prepare_internal(db, schema, ctx, std::move(prepared_attributes), stats);
// At this point the prepare context instance should have a list of
// `function_call` AST nodes corresponding to non-pure functions that
// evaluate partition key constraints.
//
// These calls can affect partition key ranges computation and target shard
// selection for LWT statements.
// For such cases we need to forward the computed execution result of the
// function when redirecting the query execution to another shard.
// Otherwise, it's possible that we end up bouncing indefinitely between
// various shards when evaluating a non-deterministic function each time on
// each shard.
//
// Prepare context is used to keep track of such AST nodes and also modifies
// them to include an id, that will be used for caching the results.
// At this point we don't yet know if it's an LWT query or not, because the
// prepared statement object is constructed later.
//
// Since this cache is only meaningful for LWT queries, just clear the ids
// if it's not a conditional statement so that the AST nodes don't
// participate in the caching mechanism later.
if (!prepared_stmt->has_conditions() && prepared_stmt->_restrictions.has_value()) {
ctx.clear_pk_function_calls_cache();
}
prepared_stmt->_may_use_token_aware_routing = ctx.get_partition_key_bind_indexes(*schema).size() != 0;
return prepared_stmt;
}
static
expr::expression
update_for_lwt_null_equality_rules(const expr::expression& e) {
using namespace expr;
return search_and_replace(e, [] (const expression& e) -> std::optional<expression> {
if (auto* binop = as_if<binary_operator>(&e)) {
auto new_binop = *binop;
new_binop.null_handling = expr::null_handling_style::lwt_nulls;
return new_binop;
}
return std::nullopt;
});
}
static
expr::expression
column_condition_prepare(const expr::expression& expr, data_dictionary::database db, const sstring& keyspace, const schema& schema){
auto prepared = expr::prepare_expression(expr, db, keyspace, &schema, make_lw_shared<column_specification>("", "", make_shared<column_identifier>("IF condition", true), boolean_type));
expr::verify_no_aggregate_functions(prepared, "IF clause");
expr::for_each_expression<expr::column_value>(prepared, [] (const expr::column_value& cval) {
auto def = cval.col;
if (def->is_primary_key()) {
throw exceptions::invalid_request_exception(format("PRIMARY KEY column '{}' cannot have IF conditions", def->name_as_text()));
}
});
// If a collection is multi-cell and not frozen, it is returned as a map even if the
// underlying data type is "set" or "list". This is controlled by
// partition_slice::collections_as_maps enum, which is set when preparing a read command
// object. Representing a list as a map<timeuuid, listval> is necessary to identify the list field
// being updated, e.g. in case of UPDATE t SET list[3] = null WHERE a = 1 IF list[3]
// = 'key'
//
// We adjust for it by reinterpreting the returned value as a list, since the map
// representation is not needed here.
prepared = expr::adjust_for_collection_as_maps(prepared);
prepared = expr::optimize_like(prepared);
prepared = update_for_lwt_null_equality_rules(prepared);
return prepared;
}
void
modification_statement::prepare_conditions(data_dictionary::database db, const schema& schema, prepare_context& ctx,
cql3::statements::modification_statement& stmt) const
{
if (_if_not_exists || _if_exists || _conditions) {
if (stmt.is_counter()) {
throw exceptions::invalid_request_exception("Conditional updates are not supported on counter tables");
}
if (_attrs->timestamp) {
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates");
}
if (_if_not_exists) {
// To have both 'IF NOT EXISTS' and some other conditions doesn't make sense.
// So far this is enforced by the parser, but let's SCYLLA_ASSERT it for sanity if ever the parse changes.
SCYLLA_ASSERT(!_conditions);
SCYLLA_ASSERT(!_if_exists);
stmt.set_if_not_exist_condition();
} else if (_if_exists) {
SCYLLA_ASSERT(!_conditions);
SCYLLA_ASSERT(!_if_not_exists);
stmt.set_if_exist_condition();
} else {
stmt._condition = column_condition_prepare(*_conditions, db, keyspace(), schema);
expr::fill_prepare_context(stmt._condition, ctx);
stmt.analyze_condition(stmt._condition);
}
stmt.build_cas_result_set_metadata();
}
}
audit::statement_category modification_statement::category() const {
return audit::statement_category::DML;
}
} // namespace raw
void
modification_statement::validate(query_processor&, const service::client_state& state) const {
if (has_conditions() && attrs->is_timestamp_set()) {
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates");
}
if (is_counter() && attrs->is_timestamp_set() && !is_raw_counter_shard_write()) {
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for counter updates");
}
if (is_counter() && attrs->is_time_to_live_set()) {
throw exceptions::invalid_request_exception("Cannot provide custom TTL for counter updates");
}
if (is_view()) {
throw exceptions::invalid_request_exception("Cannot directly modify a materialized view");
}
}
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
}
void modification_statement::add_operation(::shared_ptr<operation> op) {
if (op->column.is_static()) {
_sets_static_columns = true;
} else {
_sets_regular_columns = true;
_selects_a_collection |= op->column.type->is_collection();
}
if (op->requires_read()) {
_requires_read = true;
_columns_to_read.set(op->column.ordinal_id);
if (op->column.type->is_collection() ) {
auto ctype = static_pointer_cast<const collection_type_impl>(op->column.type);
if (!ctype->is_multi_cell()) {
throw std::logic_error(format("cannot prefetch frozen collection: {}", op->column.name_as_text()));
}
}
}
if (op->column.is_counter()) {
auto is_raw_counter_shard_write = op->is_raw_counter_shard_write();
if (_is_raw_counter_shard_write && _is_raw_counter_shard_write != is_raw_counter_shard_write) {
throw exceptions::invalid_request_exception("Cannot mix regular and raw counter updates");
}
_is_raw_counter_shard_write = is_raw_counter_shard_write;
}
_column_operations.push_back(std::move(op));
}
void modification_statement::inc_cql_stats(bool is_internal) const {
const source_selector src_sel = is_internal
? source_selector::INTERNAL : source_selector::USER;
const cond_selector cond_sel = has_conditions()
? cond_selector::WITH_CONDITIONS : cond_selector::NO_CONDITIONS;
++_stats.query_cnt(src_sel, _ks_sel, cond_sel, type);
}
bool modification_statement::is_conditional() const {
return has_conditions();
}
void modification_statement::analyze_condition(expr::expression cond) {
expr::for_each_expression<expr::column_value>(cond, [&] (const expr::column_value& col) {
if (col.col->is_static()) {
_has_static_column_conditions = true;
} else {
_has_regular_column_conditions = true;
_selects_a_collection |= col.col->type->is_collection();
}
});
}
void modification_statement::set_if_not_exist_condition() {
// We don't know yet if we need to select only static columns to check this
// condition or we need regular columns as well. So we postpone setting
// _has_regular_column_conditions/_has_static_column_conditions flag until
// we process WHERE clause, see process_where_clause().
_if_not_exists = true;
}
bool modification_statement::has_if_not_exist_condition() const {
return _if_not_exists;
}
void modification_statement::set_if_exist_condition() {
// See a comment in set_if_not_exist_condition().
_if_exists = true;
}
bool modification_statement::has_if_exist_condition() const {
return _if_exists;
}
void modification_statement::validate_where_clause_for_conditions() const {
// We don't support IN for CAS operation so far
if (_restrictions->key_is_in_relation()) {
throw exceptions::invalid_request_exception(
format("IN on the partition key is not supported with conditional {}",
type.is_update() ? "updates" : "deletions"));
}
if (_restrictions->clustering_key_restrictions_has_IN()) {
throw exceptions::invalid_request_exception(
format("IN on the clustering key columns is not supported with conditional {}",
type.is_update() ? "updates" : "deletions"));
}
if (type.is_delete() && (_restrictions->has_unrestricted_clustering_columns() ||
!_restrictions->clustering_key_restrictions_has_only_eq())) {
bool deletes_regular_columns = _column_operations.empty() ||
std::any_of(_column_operations.begin(), _column_operations.end(), [] (auto&& op) {
return !op->column.is_static();
});
// For example, primary key is (a, b, c), only a and b are restricted
if (deletes_regular_columns) {
throw exceptions::invalid_request_exception(
"DELETE statements must restrict all PRIMARY KEY columns with equality relations"
" in order to delete non static columns");
}
// All primary key parts must be specified, unless this statement has only static column conditions
if (_has_regular_column_conditions) {
throw exceptions::invalid_request_exception(
"DELETE statements must restrict all PRIMARY KEY columns with equality relations"
" in order to use IF condition on non static columns");
}
}
}
modification_statement::json_cache_opt modification_statement::maybe_prepare_json_cache(const query_options& options) const {
return {};
}
const statement_type statement_type::INSERT = statement_type(statement_type::type::insert);
const statement_type statement_type::UPDATE = statement_type(statement_type::type::update);
const statement_type statement_type::DELETE = statement_type(statement_type::type::del);
const statement_type statement_type::SELECT = statement_type(statement_type::type::select);
namespace raw {
modification_statement::modification_statement(cf_name name, std::unique_ptr<attributes::raw> attrs, std::optional<expr::expression> conditions, bool if_not_exists, bool if_exists)
: cf_statement{std::move(name)}
, _attrs{std::move(attrs)}
, _conditions{std::move(conditions)}
, _if_not_exists{if_not_exists}
, _if_exists{if_exists}
{ }
}
}
}