diff --git a/cql3/cql_statement.hh b/cql3/cql_statement.hh index fa7425e583..2ff7f5758f 100644 --- a/cql3/cql_statement.hh +++ b/cql3/cql_statement.hh @@ -11,6 +11,7 @@ #pragma once #include "timeout_config.hh" +#include "service/raft/raft_group0_client.hh" namespace service { @@ -48,6 +49,9 @@ public: // CQL statement text seastar::sstring raw_cql_statement; + // True for statements that needs guard to be taken before the execution + bool needs_guard = false; + explicit cql_statement(timeout_config_selector timeout_selector) : _timeout_config_selector(timeout_selector) {} virtual ~cql_statement() @@ -82,7 +86,7 @@ public: * @param options options for this query (consistency, variables, pageSize, ...) */ virtual seastar::future> - execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0; + execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const = 0; /** * Execute the statement and return the resulting result or null if there is no result. @@ -94,8 +98,8 @@ public: * @param options options for this query (consistency, variables, pageSize, ...) */ virtual seastar::future> - execute_without_checking_exception_message(query_processor& qp, service::query_state& state, const query_options& options) const { - return execute(qp, state, options); + execute_without_checking_exception_message(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { + return execute(qp, state, options, std::move(guard)); } virtual bool depends_on(std::string_view ks_name, std::optional cf_name) const = 0; diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index cee46d83c3..afc5bc34be 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -29,6 +29,7 @@ #include "data_dictionary/data_dictionary.hh" #include "utils/hashers.hh" #include "utils/error_injection.hh" +#include "service/migration_manager.hh" namespace cql3 { @@ -484,16 +485,55 @@ future<> query_processor::stop() { }); } +future<::shared_ptr> query_processor::execute_with_guard( + std::function>(service::query_state&, ::shared_ptr, const query_options&, std::optional)> fn, + ::shared_ptr statement, service::query_state& query_state, const query_options& options) { + // execute all statements that need group0 guard on shard0 + if (this_shard_id() != 0) { + co_return ::make_shared(0, + std::move(const_cast(options).take_cached_pk_function_calls())); + } + + auto [remote_, holder] = remote(); + size_t retries = remote_.get().mm.get_concurrent_ddl_retries(); + while (true) { + try { + auto guard = co_await remote_.get().mm.start_group0_operation(); + co_return co_await fn(query_state, statement, options, std::move(guard)); + } catch (const service::group0_concurrent_modification& ex) { + log.warn("Failed to execute statement \"{}\" due to guard conflict.{}.", + statement->raw_cql_statement, retries ? " Retrying" : " Number of retries exceeded, giving up"); + if (retries--) { + continue; + } + throw; + } + } +} + +template +future<::shared_ptr> +query_processor::execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr statement, const query_options& options, + future<::shared_ptr>(query_processor::*fn)(service::query_state&, ::shared_ptr, const query_options&, std::optional, Args...), Args... args) { + if (!statement->needs_guard) { + return (this->*fn)(query_state, std::move(statement), options, std::nullopt, std::forward(args)...); + } + static auto exec = [fn] (query_processor& qp, Args... args, service::query_state& query_state, ::shared_ptr statement, const query_options& options, std::optional guard) { + return (qp.*fn)(query_state, std::move(statement), options, std::move(guard), std::forward(args)...); + }; + return execute_with_guard(std::bind_front(exec, std::ref(*this), std::forward(args)...), std::move(statement), query_state, options); +} + future<::shared_ptr> query_processor::execute_direct_without_checking_exception_message(const sstring_view& query_string, service::query_state& query_state, query_options& options) { log.trace("execute_direct: \"{}\"", query_string); tracing::trace(query_state.get_trace_state(), "Parsing a statement"); auto p = get_statement(query_string, query_state.get_client_state()); - auto cql_statement = p->statement; + auto statement = p->statement; const auto warnings = std::move(p->warnings); - if (cql_statement->get_bound_terms() != options.get_values_count()) { + if (statement->get_bound_terms() != options.get_values_count()) { const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}", - cql_statement->get_bound_terms(), + statement->get_bound_terms(), options.get_values_count()); throw exceptions::invalid_request_exception(msg); } @@ -505,8 +545,18 @@ query_processor::execute_direct_without_checking_exception_message(const sstring metrics.regularStatementsExecuted.inc(); #endif tracing::trace(query_state.get_trace_state(), "Processing a statement"); - co_await cql_statement->check_access(*this, query_state.get_client_state()); - auto m = co_await process_authorized_statement(std::move(cql_statement), query_state, options); + return execute_maybe_with_guard(query_state, std::move(statement), options, &query_processor::do_execute_direct, std::move(warnings)); +} + +future<::shared_ptr> +query_processor::do_execute_direct( + service::query_state& query_state, + shared_ptr statement, + const query_options& options, + std::optional guard, + cql3::cql_warnings_vec warnings) { + co_await statement->check_access(*this, query_state.get_client_state()); + auto m = co_await process_authorized_statement(statement, query_state, options, std::move(guard)); for (const auto& w : warnings) { m->add_warning(w); } @@ -515,14 +565,24 @@ query_processor::execute_direct_without_checking_exception_message(const sstring future<::shared_ptr> query_processor::execute_prepared_without_checking_exception_message( + service::query_state& query_state, + shared_ptr statement, + const query_options& options, statements::prepared_statement::checked_weak_ptr prepared, cql3::prepared_cache_key_type cache_key, - service::query_state& query_state, - const query_options& options, bool needs_authorization) { + return execute_maybe_with_guard(query_state, std::move(statement), options, &query_processor::do_execute_prepared, std::move(prepared), std::move(cache_key), needs_authorization); +} - ::shared_ptr statement = prepared->statement; - +future<::shared_ptr> +query_processor::do_execute_prepared( + service::query_state& query_state, + shared_ptr statement, + const query_options& options, + std::optional guard, + statements::prepared_statement::checked_weak_ptr prepared, + cql3::prepared_cache_key_type cache_key, + bool needs_authorization) { if (needs_authorization) { co_await statement->check_access(*this, query_state.get_client_state()); try { @@ -531,19 +591,18 @@ query_processor::execute_prepared_without_checking_exception_message( log.error("failed to cache the entry: {}", std::current_exception()); } } - - co_return co_await process_authorized_statement(std::move(statement), query_state, options); + co_return co_await process_authorized_statement(std::move(statement), query_state, options, std::move(guard)); } future<::shared_ptr> -query_processor::process_authorized_statement(const ::shared_ptr statement, service::query_state& query_state, const query_options& options) { +query_processor::process_authorized_statement(const ::shared_ptr statement, service::query_state& query_state, const query_options& options, std::optional guard) { auto& client_state = query_state.get_client_state(); ++_stats.queries_by_cl[size_t(options.get_consistency())]; statement->validate(*this, client_state); - auto msg = co_await statement->execute_without_checking_exception_message(*this, query_state, options); + auto msg = co_await statement->execute_without_checking_exception_message(*this, query_state, options, std::move(guard)); if (msg) { co_return std::move(msg); @@ -749,7 +808,7 @@ query_processor::execute_paged_internal(internal_query_state& state) { state.p->statement->validate(*this, service::client_state::for_internal_calls()); auto qs = query_state_for_internal_call(); ::shared_ptr msg = - co_await state.p->statement->execute(*this, qs, *state.opts); + co_await state.p->statement->execute(*this, qs, *state.opts, std::nullopt); class visitor : public result_message::visitor_base { internal_query_state& _state; @@ -823,11 +882,22 @@ query_processor::execute_with_params( service::query_state& query_state, const std::initializer_list& values) { auto opts = make_internal_options(p, values, cl); - p->statement->validate(*this, service::client_state::for_internal_calls()); - auto msg = co_await p->statement->execute(*this, query_state, opts); + auto statement = p->statement; + + auto msg = co_await execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params); co_return ::make_shared(msg); } +future<::shared_ptr> +query_processor::do_execute_with_params( + service::query_state& query_state, + shared_ptr statement, + const query_options& options, std::optional guard) { + statement->validate(*this, service::client_state::for_internal_calls()); + co_return co_await statement->execute(*this, query_state, options, std::move(guard)); +} + + future<::shared_ptr> query_processor::execute_batch_without_checking_exception_message( ::shared_ptr batch, @@ -852,7 +922,7 @@ query_processor::execute_batch_without_checking_exception_message( } log.trace("execute_batch({}): {}", batch->get_statements().size(), oss.str()); } - co_return co_await batch->execute(*this, query_state, options); + co_return co_await batch->execute(*this, query_state, options, std::nullopt); } future @@ -868,44 +938,31 @@ query_processor::forward(query::forward_request req, tracing::trace_state_ptr tr } future<::shared_ptr> -query_processor::execute_schema_statement(const statements::schema_altering_statement& stmt, service::query_state& state, const query_options& options) { +query_processor::execute_schema_statement(const statements::schema_altering_statement& stmt, service::query_state& state, const query_options& options, std::optional guard) { ::shared_ptr ce; if (this_shard_id() != 0) { - // execute all schema altering statements on a shard zero since this is where raft group 0 is - co_return ::make_shared(0, - std::move(const_cast(options).take_cached_pk_function_calls())); + on_internal_error(log, "DDL must be executed on shard 0"); } + if (!guard) { + on_internal_error(log, "Guard must be present when executing DDL"); + } + + auto [remote_, holder] = remote(); + cql3::cql_warnings_vec warnings; - auto [remote_, holder] = remote(); - auto& mm = remote_.get().mm; - auto retries = mm.get_concurrent_ddl_retries(); - while (true) { - try { - auto group0_guard = co_await mm.start_group0_operation(); + auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, guard->write_timestamp()); + warnings = std::move(cql_warnings); - auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, group0_guard.write_timestamp()); - warnings = std::move(cql_warnings); - - if (!m.empty()) { - auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement); - co_await mm.announce(std::move(m), std::move(group0_guard), description); - } - - ce = std::move(ret); - } catch (const service::group0_concurrent_modification&) { - log.warn("Failed to execute DDL statement \"{}\" due to concurrent group 0 modification.{}.", - stmt.raw_cql_statement, retries ? " Retrying" : " Number of retries exceeded, giving up"); - if (retries--) { - continue; - } - throw; - } - break; + if (!m.empty()) { + auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement); + co_await remote_.get().mm.announce(std::move(m), std::move(*guard), description); } + ce = std::move(ret); + // If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing // extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600) ::shared_ptr result; diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 754896be14..15757af8b0 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -205,11 +205,13 @@ public: service::query_state& query_state, const query_options& options, bool needs_authorization) { + auto cql_statement = statement->statement; return execute_prepared_without_checking_exception_message( + query_state, + std::move(cql_statement), + options, std::move(statement), std::move(cache_key), - query_state, - options, needs_authorization) .then(cql_transport::messages::propagate_exception_as_future<::shared_ptr>); } @@ -218,11 +220,22 @@ public: // The result_message::exception must be explicitly handled. future<::shared_ptr> execute_prepared_without_checking_exception_message( - statements::prepared_statement::checked_weak_ptr statement, - cql3::prepared_cache_key_type cache_key, - service::query_state& query_state, - const query_options& options, - bool needs_authorization); + service::query_state& query_state, + shared_ptr statement, + const query_options& options, + statements::prepared_statement::checked_weak_ptr prepared, + cql3::prepared_cache_key_type cache_key, + bool needs_authorization); + + future<::shared_ptr> + do_execute_prepared( + service::query_state& query_state, + shared_ptr statement, + const query_options& options, + std::optional guard, + statements::prepared_statement::checked_weak_ptr prepared, + cql3::prepared_cache_key_type cache_key, + bool needs_authorization); /// Execute a client statement that was not prepared. inline @@ -246,6 +259,14 @@ public: service::query_state& query_state, query_options& options); + future<::shared_ptr> + do_execute_direct( + service::query_state& query_state, + shared_ptr statement, + const query_options& options, + std::optional guard, + cql3::cql_warnings_vec warnings); + statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query); /*! @@ -350,6 +371,13 @@ public: service::query_state& query_state, const std::initializer_list& = { }); + future<::shared_ptr> do_execute_with_params( + service::query_state& query_state, + shared_ptr statement, + const query_options& options, + std::optional guard); + + future<::shared_ptr> prepare(sstring query_string, service::query_state& query_state); @@ -389,8 +417,10 @@ public: future forward(query::forward_request, tracing::trace_state_ptr); + struct retry_statement_execution_error : public std::exception {}; + future<::shared_ptr> - execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options); + execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options, std::optional guard); future execute_thrift_schema_command( @@ -420,7 +450,7 @@ private: int32_t page_size = -1) const; future<::shared_ptr> - process_authorized_statement(const ::shared_ptr statement, service::query_state& query_state, const query_options& options); + process_authorized_statement(const ::shared_ptr statement, service::query_state& query_state, const query_options& options, std::optional guard); /*! * \brief created a state object for paging @@ -455,6 +485,15 @@ private: */ bool has_more_results(cql3::internal_query_state& state) const; + template + future<::shared_ptr> + execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr statement, const query_options& options, + future<::shared_ptr>(query_processor::*fn)(service::query_state&, ::shared_ptr, const query_options&, std::optional, Args...), Args... args); + + future<::shared_ptr> execute_with_guard( + std::function>(service::query_state&, ::shared_ptr, const query_options&, std::optional)> fn, + ::shared_ptr statement, service::query_state& query_state, const query_options& options); + /// /// \tparam ResultMsgType type of the returned result message (CQL or Thrift) /// \tparam PreparedKeyGenerator a function that generates the prepared statement cache key for given query and diff --git a/cql3/statements/alter_keyspace_statement.cc b/cql3/statements/alter_keyspace_statement.cc index 1328ea6e21..404678a04a 100644 --- a/cql3/statements/alter_keyspace_statement.cc +++ b/cql3/statements/alter_keyspace_statement.cc @@ -102,9 +102,9 @@ cql3::statements::alter_keyspace_statement::prepare(data_dictionary::database db static logging::logger mylogger("alter_keyspace"); future<::shared_ptr> -cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { std::optional warning = check_restricted_replication_strategy(qp, keyspace(), *_attrs); - return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr msg) { + return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr msg) { if (warning) { msg->add_warning(*warning); mylogger.warn("{}", *warning); diff --git a/cql3/statements/alter_keyspace_statement.hh b/cql3/statements/alter_keyspace_statement.hh index 4abddd960d..261fc0f1d3 100644 --- a/cql3/statements/alter_keyspace_statement.hh +++ b/cql3/statements/alter_keyspace_statement.hh @@ -35,7 +35,7 @@ public: void validate(query_processor& qp, const service::client_state& state) const override; future, std::vector, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; - virtual future<::shared_ptr> execute(query_processor& qp, service::query_state& state, const query_options& options) const override; + virtual future<::shared_ptr> execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; }; } diff --git a/cql3/statements/alter_role_statement.hh b/cql3/statements/alter_role_statement.hh index 3fa0cca9e0..d4eea0ccc5 100644 --- a/cql3/statements/alter_role_statement.hh +++ b/cql3/statements/alter_role_statement.hh @@ -38,7 +38,7 @@ public: virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/alter_service_level_statement.cc b/cql3/statements/alter_service_level_statement.cc index 150c989955..b4832b73e2 100644 --- a/cql3/statements/alter_service_level_statement.cc +++ b/cql3/statements/alter_service_level_statement.cc @@ -36,7 +36,7 @@ future<> alter_service_level_statement::check_access(query_processor& qp, const future<::shared_ptr> alter_service_level_statement::execute(query_processor& qp, service::query_state &state, - const query_options &) const { + const query_options &, std::optional guard) const { qos::service_level& sl = state.get_service_level_controller().get_service_level(_service_level); qos::service_level_options slo = _slo.replace_defaults(sl.slo); return state.get_service_level_controller().alter_distributed_service_level(_service_level, slo).then([] { diff --git a/cql3/statements/alter_service_level_statement.hh b/cql3/statements/alter_service_level_statement.hh index be1a0ba7a0..9b483e469d 100644 --- a/cql3/statements/alter_service_level_statement.hh +++ b/cql3/statements/alter_service_level_statement.hh @@ -26,7 +26,7 @@ public: std::unique_ptr prepare(data_dictionary::database db, cql_stats &stats) override; virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/alter_table_statement.cc b/cql3/statements/alter_table_statement.cc index cb9243fe63..3fc90987c7 100644 --- a/cql3/statements/alter_table_statement.cc +++ b/cql3/statements/alter_table_statement.cc @@ -402,10 +402,10 @@ cql3::statements::alter_table_statement::prepare(data_dictionary::database db, c } future<::shared_ptr> -alter_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +alter_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { auto s = validation::validate_column_family(qp.db(), keyspace(), column_family()); std::optional warning = check_restricted_table_properties(qp, s, keyspace(), column_family(), *_properties); - return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr msg) { + return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr msg) { if (warning) { msg->add_warning(*warning); mylogger.warn("{}", *warning); diff --git a/cql3/statements/alter_table_statement.hh b/cql3/statements/alter_table_statement.hh index 991e44fb4f..fed7825c32 100644 --- a/cql3/statements/alter_table_statement.hh +++ b/cql3/statements/alter_table_statement.hh @@ -52,7 +52,7 @@ public: virtual future<> check_access(query_processor& qp, const service::client_state& state) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; - virtual future<::shared_ptr> execute(query_processor& qp, service::query_state& state, const query_options& options) const override; + virtual future<::shared_ptr> execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; future, std::vector, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; private: diff --git a/cql3/statements/attach_service_level_statement.cc b/cql3/statements/attach_service_level_statement.cc index f0f809cea8..03748341e3 100644 --- a/cql3/statements/attach_service_level_statement.cc +++ b/cql3/statements/attach_service_level_statement.cc @@ -35,7 +35,8 @@ future<> attach_service_level_statement::check_access(query_processor& qp, const future<::shared_ptr> attach_service_level_statement::execute(query_processor& qp, service::query_state &state, - const query_options &) const { + const query_options &, + std::optional guard) const { return state.get_service_level_controller().get_distributed_service_level(_service_level).then([this] (qos::service_levels_info sli) { if (sli.empty()) { throw qos::nonexistant_service_level_exception(_service_level); diff --git a/cql3/statements/attach_service_level_statement.hh b/cql3/statements/attach_service_level_statement.hh index 6b6edf0773..f0742b82c4 100644 --- a/cql3/statements/attach_service_level_statement.hh +++ b/cql3/statements/attach_service_level_statement.hh @@ -25,7 +25,7 @@ public: std::unique_ptr prepare(data_dictionary::database db, cql_stats &stats) override; virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/batch_statement.cc b/cql3/statements/batch_statement.cc index 4ff1446fe4..125f056d04 100644 --- a/cql3/statements/batch_statement.cc +++ b/cql3/statements/batch_statement.cc @@ -234,13 +234,13 @@ static thread_local inheriting_concrete_execution_stage< api::timestamp_type> batch_stage{"cql3_batch", batch_statement_executor::get()}; future> batch_statement::execute( - query_processor& qp, service::query_state& state, const query_options& options) const { - return execute_without_checking_exception_message(qp, state, options) + query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { + return execute_without_checking_exception_message(qp, state, options, std::move(guard)) .then(cql_transport::messages::propagate_exception_as_future>); } future> batch_statement::execute_without_checking_exception_message( - query_processor& qp, service::query_state& state, const query_options& options) const { + query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { cql3::util::validate_timestamp(qp.db().get_config(), options, _attrs); return batch_stage(this, seastar::ref(qp), seastar::ref(state), seastar::cref(options), false, options.get_timestamp(state)); diff --git a/cql3/statements/batch_statement.hh b/cql3/statements/batch_statement.hh index dbe45afa64..1d2c1b6d99 100644 --- a/cql3/statements/batch_statement.hh +++ b/cql3/statements/batch_statement.hh @@ -118,10 +118,10 @@ public: static void verify_batch_size(query_processor& qp, const std::vector& mutations); virtual future> execute( - query_processor& qp, service::query_state& state, const query_options& options) const override; + query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; virtual future> execute_without_checking_exception_message( - query_processor& qp, service::query_state& state, const query_options& options) const override; + query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; private: friend class batch_statement_executor; future> do_execute( diff --git a/cql3/statements/create_keyspace_statement.cc b/cql3/statements/create_keyspace_statement.cc index 062caf189a..59eb167bc9 100644 --- a/cql3/statements/create_keyspace_statement.cc +++ b/cql3/statements/create_keyspace_statement.cc @@ -204,9 +204,9 @@ std::optional check_restricted_replication_strategy( } future<::shared_ptr> -create_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +create_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { std::optional warning = check_restricted_replication_strategy(qp, keyspace(), *_attrs); - return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr msg) { + return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr msg) { if (warning) { msg->add_warning(*warning); mylogger.warn("{}", *warning); diff --git a/cql3/statements/create_keyspace_statement.hh b/cql3/statements/create_keyspace_statement.hh index 55d14ae361..b5e6f0176b 100644 --- a/cql3/statements/create_keyspace_statement.hh +++ b/cql3/statements/create_keyspace_statement.hh @@ -71,7 +71,7 @@ public: virtual future<> grant_permissions_to_creator(const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor& qp, service::query_state& state, const query_options& options) const override; + execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; lw_shared_ptr get_keyspace_metadata(const locator::token_metadata& tm); }; diff --git a/cql3/statements/create_role_statement.hh b/cql3/statements/create_role_statement.hh index 680557d8f6..48441742cf 100644 --- a/cql3/statements/create_role_statement.hh +++ b/cql3/statements/create_role_statement.hh @@ -44,7 +44,7 @@ public: virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/create_service_level_statement.cc b/cql3/statements/create_service_level_statement.cc index 33eb820f57..8e6a5793fa 100644 --- a/cql3/statements/create_service_level_statement.cc +++ b/cql3/statements/create_service_level_statement.cc @@ -36,7 +36,8 @@ future<> create_service_level_statement::check_access(query_processor& qp, const future<::shared_ptr> create_service_level_statement::execute(query_processor& qp, service::query_state &state, - const query_options &) const { + const query_options &, + std::optional guard) const { qos::service_level_options slo = _slo.replace_defaults(qos::service_level_options{}); return state.get_service_level_controller().add_distributed_service_level(_service_level, slo, _if_not_exists).then([] { using void_result_msg = cql_transport::messages::result_message::void_message; diff --git a/cql3/statements/create_service_level_statement.hh b/cql3/statements/create_service_level_statement.hh index fed7650cd0..635b7f2355 100644 --- a/cql3/statements/create_service_level_statement.hh +++ b/cql3/statements/create_service_level_statement.hh @@ -27,7 +27,7 @@ public: std::unique_ptr prepare(data_dictionary::database db, cql_stats &stats) override; virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/create_table_statement.cc b/cql3/statements/create_table_statement.cc index cef65944a5..4cbb0c5420 100644 --- a/cql3/statements/create_table_statement.cc +++ b/cql3/statements/create_table_statement.cc @@ -489,9 +489,9 @@ std::optional check_restricted_table_properties( } future<::shared_ptr> -create_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +create_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { std::optional warning = check_restricted_table_properties(qp, std::nullopt, keyspace(), column_family(), *_properties); - return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr msg) { + return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr msg) { if (warning) { msg->add_warning(*warning); mylogger.warn("{}", *warning); diff --git a/cql3/statements/create_table_statement.hh b/cql3/statements/create_table_statement.hh index 6836b6145c..4f4ade0dc7 100644 --- a/cql3/statements/create_table_statement.hh +++ b/cql3/statements/create_table_statement.hh @@ -78,7 +78,7 @@ public: virtual future<> grant_permissions_to_creator(const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor& qp, service::query_state& state, const query_options& options) const override; + execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; schema_ptr get_cf_meta_data(const data_dictionary::database) const; diff --git a/cql3/statements/describe_statement.cc b/cql3/statements/describe_statement.cc index 7324834222..db54c840d8 100644 --- a/cql3/statements/describe_statement.cc +++ b/cql3/statements/describe_statement.cc @@ -441,7 +441,7 @@ seastar::shared_ptr describe_statement::get_result_metadata() co } seastar::future> -describe_statement::execute(cql3::query_processor& qp, service::query_state& state, const query_options& options) const { +describe_statement::execute(cql3::query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { auto& client_state = state.get_client_state(); auto descriptions = co_await describe(qp, client_state); diff --git a/cql3/statements/describe_statement.hh b/cql3/statements/describe_statement.hh index 3f0b31d141..24ed5b362e 100644 --- a/cql3/statements/describe_statement.hh +++ b/cql3/statements/describe_statement.hh @@ -51,7 +51,7 @@ public: virtual seastar::shared_ptr get_result_metadata() const override; virtual seastar::future> - execute(cql3::query_processor& qp, service::query_state& state, const query_options& options) const override; + execute(cql3::query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; }; class cluster_describe_statement : public describe_statement { diff --git a/cql3/statements/detach_service_level_statement.cc b/cql3/statements/detach_service_level_statement.cc index e5d75be91c..fefd34eb9d 100644 --- a/cql3/statements/detach_service_level_statement.cc +++ b/cql3/statements/detach_service_level_statement.cc @@ -34,7 +34,8 @@ future<> detach_service_level_statement::check_access(query_processor& qp, const future<::shared_ptr> detach_service_level_statement::execute(query_processor& qp, service::query_state &state, - const query_options &) const { + const query_options &, + std::optional guard) const { return state.get_client_state().get_auth_service()->underlying_role_manager().remove_attribute(_role_name, "service_level").then([] { using void_result_msg = cql_transport::messages::result_message::void_message; using result_msg = cql_transport::messages::result_message; diff --git a/cql3/statements/detach_service_level_statement.hh b/cql3/statements/detach_service_level_statement.hh index a07477d752..3b2cc5ed43 100644 --- a/cql3/statements/detach_service_level_statement.hh +++ b/cql3/statements/detach_service_level_statement.hh @@ -23,7 +23,7 @@ public: std::unique_ptr prepare(data_dictionary::database db, cql_stats &stats) override; virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/drop_role_statement.hh b/cql3/statements/drop_role_statement.hh index 4724c16f2b..a43a525e4e 100644 --- a/cql3/statements/drop_role_statement.hh +++ b/cql3/statements/drop_role_statement.hh @@ -38,7 +38,7 @@ public: virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/drop_service_level_statement.cc b/cql3/statements/drop_service_level_statement.cc index 8b27a7534f..4f3d16faea 100644 --- a/cql3/statements/drop_service_level_statement.cc +++ b/cql3/statements/drop_service_level_statement.cc @@ -34,7 +34,8 @@ future<> drop_service_level_statement::check_access(query_processor& qp, const s future<::shared_ptr> drop_service_level_statement::execute(query_processor& qp, service::query_state &state, - const query_options &) const { + const query_options &, + std::optional guard) const { return state.get_service_level_controller().drop_distributed_service_level(_service_level, _if_exists).then([] { using void_result_msg = cql_transport::messages::result_message::void_message; using result_msg = cql_transport::messages::result_message; diff --git a/cql3/statements/drop_service_level_statement.hh b/cql3/statements/drop_service_level_statement.hh index d839294cc5..1d749e0f01 100644 --- a/cql3/statements/drop_service_level_statement.hh +++ b/cql3/statements/drop_service_level_statement.hh @@ -24,7 +24,7 @@ public: std::unique_ptr prepare(data_dictionary::database db, cql_stats &stats) override; virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/grant_role_statement.hh b/cql3/statements/grant_role_statement.hh index f4a1f3b73a..c72742f9ee 100644 --- a/cql3/statements/grant_role_statement.hh +++ b/cql3/statements/grant_role_statement.hh @@ -37,7 +37,7 @@ public: virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/grant_statement.cc b/cql3/statements/grant_statement.cc index 2082ebaf8c..3697cd5608 100644 --- a/cql3/statements/grant_statement.cc +++ b/cql3/statements/grant_statement.cc @@ -19,7 +19,7 @@ std::unique_ptr cql3::statements::grant_st } future<::shared_ptr> -cql3::statements::grant_statement::execute(query_processor&, service::query_state& state, const query_options& options) const { +cql3::statements::grant_statement::execute(query_processor&, service::query_state& state, const query_options& options, std::optional guard) const { auto& auth_service = *state.get_client_state().get_auth_service(); return auth::grant_permissions(auth_service, _role_name, _permissions, _resource).then([] { diff --git a/cql3/statements/grant_statement.hh b/cql3/statements/grant_statement.hh index adf8b4823d..a434119662 100644 --- a/cql3/statements/grant_statement.hh +++ b/cql3/statements/grant_statement.hh @@ -26,7 +26,8 @@ public: future<::shared_ptr> execute(query_processor& , service::query_state& - , const query_options&) const override; + , const query_options& + , std::optional guard) const override; }; } diff --git a/cql3/statements/list_permissions_statement.cc b/cql3/statements/list_permissions_statement.cc index 05da97c938..c3902e0066 100644 --- a/cql3/statements/list_permissions_statement.cc +++ b/cql3/statements/list_permissions_statement.cc @@ -78,7 +78,8 @@ future<::shared_ptr> cql3::statements::list_permissions_statement::execute( query_processor& qp, service::query_state& state, - const query_options& options) const { + const query_options& options, + std::optional guard) const { static auto make_column = [](sstring name) { return make_lw_shared( auth::meta::AUTH_KS, diff --git a/cql3/statements/list_permissions_statement.hh b/cql3/statements/list_permissions_statement.hh index 6b892ddfe9..00add9631f 100644 --- a/cql3/statements/list_permissions_statement.hh +++ b/cql3/statements/list_permissions_statement.hh @@ -39,7 +39,7 @@ public: future<> check_access(query_processor& qp, const service::client_state&) const override; future<::shared_ptr> - execute(query_processor&, service::query_state& , const query_options&) const override; + execute(query_processor&, service::query_state& , const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/list_roles_statement.hh b/cql3/statements/list_roles_statement.hh index 0b6f9cb55d..6f102d03de 100644 --- a/cql3/statements/list_roles_statement.hh +++ b/cql3/statements/list_roles_statement.hh @@ -38,7 +38,7 @@ public: virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/list_service_level_attachments_statement.cc b/cql3/statements/list_service_level_attachments_statement.cc index 45371d2a13..77c74e3f34 100644 --- a/cql3/statements/list_service_level_attachments_statement.cc +++ b/cql3/statements/list_service_level_attachments_statement.cc @@ -39,7 +39,8 @@ future<> list_service_level_attachments_statement::check_access(query_processor& future<::shared_ptr> list_service_level_attachments_statement::execute(query_processor& qp, service::query_state &state, - const query_options &) const { + const query_options &, + std::optional guard) const { static auto make_column = [] (sstring name, const shared_ptr type) { return make_lw_shared( diff --git a/cql3/statements/list_service_level_attachments_statement.hh b/cql3/statements/list_service_level_attachments_statement.hh index 4bf9bf2796..71a2a076b3 100644 --- a/cql3/statements/list_service_level_attachments_statement.hh +++ b/cql3/statements/list_service_level_attachments_statement.hh @@ -25,7 +25,7 @@ public: std::unique_ptr prepare(data_dictionary::database db, cql_stats &stats) override; virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/list_service_level_statement.cc b/cql3/statements/list_service_level_statement.cc index 4de32ca18c..12f60a0715 100644 --- a/cql3/statements/list_service_level_statement.cc +++ b/cql3/statements/list_service_level_statement.cc @@ -36,7 +36,8 @@ future<> list_service_level_statement::check_access(query_processor& qp, const s future<::shared_ptr> list_service_level_statement::execute(query_processor& qp, service::query_state &state, - const query_options &) const { + const query_options &, + std::optional guard) const { static auto make_column = [] (sstring name, const shared_ptr type) { return make_lw_shared( diff --git a/cql3/statements/list_service_level_statement.hh b/cql3/statements/list_service_level_statement.hh index 62e80b90fa..e070db8a04 100644 --- a/cql3/statements/list_service_level_statement.hh +++ b/cql3/statements/list_service_level_statement.hh @@ -24,7 +24,7 @@ public: std::unique_ptr prepare(data_dictionary::database db, cql_stats &stats) override; virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/list_users_statement.cc b/cql3/statements/list_users_statement.cc index c418e38358..094f384704 100644 --- a/cql3/statements/list_users_statement.cc +++ b/cql3/statements/list_users_statement.cc @@ -26,7 +26,7 @@ future<> cql3::statements::list_users_statement::check_access(query_processor& q } future<::shared_ptr> -cql3::statements::list_users_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +cql3::statements::list_users_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { static const sstring virtual_table_name("users"); static const auto make_column_spec = [](const sstring& name, const ::shared_ptr& ty) { diff --git a/cql3/statements/list_users_statement.hh b/cql3/statements/list_users_statement.hh index fab6740a2f..0957b2e86b 100644 --- a/cql3/statements/list_users_statement.hh +++ b/cql3/statements/list_users_statement.hh @@ -26,7 +26,8 @@ public: future<> check_access(query_processor& qp, const service::client_state&) const override; future<::shared_ptr> execute(query_processor& , service::query_state& - , const query_options&) const override; + , const query_options& + , std::optional guard) const override; }; } diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 057653beb7..a609c34b4d 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -250,13 +250,13 @@ static thread_local inheriting_concrete_execution_stage< const query_options&> modify_stage{"cql3_modification", modification_statement_executor::get()}; future<::shared_ptr> -modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options) const { - return execute_without_checking_exception_message(qp, qs, options) +modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const { + return execute_without_checking_exception_message(qp, qs, options, std::move(guard)) .then(cql_transport::messages::propagate_exception_as_future>); } future<::shared_ptr> -modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const { +modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const { cql3::util::validate_timestamp(qp.db().get_config(), options, attrs); return modify_stage(this, seastar::ref(qp), seastar::ref(qs), seastar::cref(options)); } diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index 83be9b411e..70b96b0dc6 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -229,10 +229,10 @@ public: bool has_only_static_column_conditions() const { return !_has_regular_column_conditions && _has_static_column_conditions; } virtual future<::shared_ptr> - execute(query_processor& qp, service::query_state& qs, const query_options& options) const override; + execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const override; virtual future<::shared_ptr> - execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override; + execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const override; private: future> diff --git a/cql3/statements/revoke_role_statement.hh b/cql3/statements/revoke_role_statement.hh index eeba648ef5..0db73b65d4 100644 --- a/cql3/statements/revoke_role_statement.hh +++ b/cql3/statements/revoke_role_statement.hh @@ -37,7 +37,7 @@ public: virtual future<> check_access(query_processor& qp, const service::client_state&) const override; virtual future<::shared_ptr> - execute(query_processor&, service::query_state&, const query_options&) const override; + execute(query_processor&, service::query_state&, const query_options&, std::optional guard) const override; }; } diff --git a/cql3/statements/revoke_statement.cc b/cql3/statements/revoke_statement.cc index e4ffea2b12..227e872ea0 100644 --- a/cql3/statements/revoke_statement.cc +++ b/cql3/statements/revoke_statement.cc @@ -19,7 +19,7 @@ std::unique_ptr cql3::statements::revoke_s } future<::shared_ptr> -cql3::statements::revoke_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +cql3::statements::revoke_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { auto& auth_service = *state.get_client_state().get_auth_service(); return auth::revoke_permissions(auth_service, _role_name, _permissions, _resource).then([] { diff --git a/cql3/statements/revoke_statement.hh b/cql3/statements/revoke_statement.hh index f2f1330fec..4df1cc4890 100644 --- a/cql3/statements/revoke_statement.hh +++ b/cql3/statements/revoke_statement.hh @@ -26,7 +26,8 @@ public: future<::shared_ptr> execute(query_processor& , service::query_state& - , const query_options&) const override; + , const query_options& + , std::optional guard) const override; }; } diff --git a/cql3/statements/role-management-statements.cc b/cql3/statements/role-management-statements.cc index 88a560124e..34a92fb4a5 100644 --- a/cql3/statements/role-management-statements.cc +++ b/cql3/statements/role-management-statements.cc @@ -89,7 +89,8 @@ future<> create_role_statement::check_access(query_processor& qp, const service: future create_role_statement::execute(query_processor&, service::query_state& state, - const query_options&) const { + const query_options&, + std::optional guard) const { auth::role_config config; config.is_superuser = *_options.is_superuser; config.can_login = *_options.can_login; @@ -173,7 +174,7 @@ future<> alter_role_statement::check_access(query_processor& qp, const service:: } future -alter_role_statement::execute(query_processor&, service::query_state& state, const query_options&) const { +alter_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional guard) const { auth::role_config_update update; update.is_superuser = _options.is_superuser; update.can_login = _options.can_login; @@ -235,7 +236,7 @@ future<> drop_role_statement::check_access(query_processor& qp, const service::c } future -drop_role_statement::execute(query_processor&, service::query_state& state, const query_options&) const { +drop_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional guard) const { auto& as = *state.get_client_state().get_auth_service(); return auth::drop_role(as, _role).then([] { @@ -286,7 +287,7 @@ future<> list_roles_statement::check_access(query_processor& qp, const service:: } future -list_roles_statement::execute(query_processor&, service::query_state& state, const query_options&) const { +list_roles_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional guard) const { static const sstring virtual_table_name("roles"); static const auto make_column_spec = [](const sstring& name, const ::shared_ptr& ty) { @@ -402,7 +403,7 @@ future<> grant_role_statement::check_access(query_processor& qp, const service:: } future -grant_role_statement::execute(query_processor&, service::query_state& state, const query_options&) const { +grant_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional guard) const { auto& as = *state.get_client_state().get_auth_service(); return as.underlying_role_manager().grant(_grantee, _role).then([] { @@ -432,7 +433,8 @@ future<> revoke_role_statement::check_access(query_processor& qp, const service: future revoke_role_statement::execute( query_processor&, service::query_state& state, - const query_options&) const { + const query_options&, + std::optional guard) const { auto& rm = state.get_client_state().get_auth_service()->underlying_role_manager(); return rm.revoke(_revokee, _role).then([] { diff --git a/cql3/statements/schema_altering_statement.cc b/cql3/statements/schema_altering_statement.cc index 0698a4c3da..1fcba7f674 100644 --- a/cql3/statements/schema_altering_statement.cc +++ b/cql3/statements/schema_altering_statement.cc @@ -27,6 +27,7 @@ schema_altering_statement::schema_altering_statement(timeout_config_selector tim , cql_statement_no_metadata(timeout_selector) , _is_column_family_level{false} { + needs_guard = true; } schema_altering_statement::schema_altering_statement(cf_name name, timeout_config_selector timeout_selector) @@ -34,6 +35,7 @@ schema_altering_statement::schema_altering_statement(cf_name name, timeout_confi , cql_statement_no_metadata(timeout_selector) , _is_column_family_level{true} { + needs_guard = true; } future<> schema_altering_statement::grant_permissions_to_creator(const service::client_state&) const { @@ -58,7 +60,7 @@ void schema_altering_statement::prepare_keyspace(const service::client_state& st } future<::shared_ptr> -schema_altering_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +schema_altering_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { bool internal = state.get_client_state().is_internal(); if (internal) { auto replication_type = locator::replication_strategy_type::everywhere_topology; @@ -73,7 +75,7 @@ schema_altering_statement::execute(query_processor& qp, service::query_state& st } } - return qp.execute_schema_statement(*this, state, options).then([this, &state, internal](::shared_ptr result) { + return qp.execute_schema_statement(*this, state, options, std::move(guard)).then([this, &state, internal](::shared_ptr result) { auto permissions_granted_fut = internal ? make_ready_future<>() : grant_permissions_to_creator(state.get_client_state()); diff --git a/cql3/statements/schema_altering_statement.hh b/cql3/statements/schema_altering_statement.hh index fbdd4965bb..7e00b1d2c0 100644 --- a/cql3/statements/schema_altering_statement.hh +++ b/cql3/statements/schema_altering_statement.hh @@ -18,6 +18,8 @@ #include +#include "service/raft/raft_group0_client.hh" + class mutation; namespace cql3 { @@ -55,7 +57,7 @@ protected: virtual void prepare_keyspace(const service::client_state& state) override; virtual future<::shared_ptr> - execute(query_processor& qp, service::query_state& state, const query_options& options) const override; + execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; public: virtual future, std::vector, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const = 0; diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index e6f171c4eb..f947b4eecc 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -328,16 +328,18 @@ static thread_local inheriting_concrete_execution_stage< future> select_statement::execute(query_processor& qp, service::query_state& state, - const query_options& options) const + const query_options& options, + std::optional guard) const { - return execute_without_checking_exception_message(qp, state, options) + return execute_without_checking_exception_message(qp, state, options, std::move(guard)) .then(cql_transport::messages::propagate_exception_as_future>); } future> select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& state, - const query_options& options) const + const query_options& options, + std::optional guard) const { return select_stage(this, seastar::ref(qp), seastar::ref(state), seastar::cref(options)); } diff --git a/cql3/statements/select_statement.hh b/cql3/statements/select_statement.hh index 39159b43b6..5b98960e1a 100644 --- a/cql3/statements/select_statement.hh +++ b/cql3/statements/select_statement.hh @@ -116,10 +116,10 @@ public: virtual bool depends_on(std::string_view ks_name, std::optional cf_name) const override; virtual future<::shared_ptr> execute(query_processor& qp, - service::query_state& state, const query_options& options) const override; + service::query_state& state, const query_options& options, std::optional guard) const override; virtual future<::shared_ptr> - execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override; + execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const override; future<::shared_ptr> execute_non_aggregate_unpaged(query_processor& qp, lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, service::query_state& state, diff --git a/cql3/statements/strongly_consistent_modification_statement.cc b/cql3/statements/strongly_consistent_modification_statement.cc index 9b82f111e9..8e31c0376d 100644 --- a/cql3/statements/strongly_consistent_modification_statement.cc +++ b/cql3/statements/strongly_consistent_modification_statement.cc @@ -45,8 +45,8 @@ strongly_consistent_modification_statement::strongly_consistent_modification_sta { } future<::shared_ptr> -strongly_consistent_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options) const { - return execute_without_checking_exception_message(qp, qs, options) +strongly_consistent_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const { + return execute_without_checking_exception_message(qp, qs, options, std::move(guard)) .then(cql_transport::messages::propagate_exception_as_future>); } @@ -65,7 +65,7 @@ evaluate_prepared( } future<::shared_ptr> -strongly_consistent_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const { +strongly_consistent_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const { if (this_shard_id() != 0) { co_return ::make_shared(0, cql3::computed_function_values{}); } diff --git a/cql3/statements/strongly_consistent_modification_statement.hh b/cql3/statements/strongly_consistent_modification_statement.hh index 560c463d0c..cb69c7f634 100644 --- a/cql3/statements/strongly_consistent_modification_statement.hh +++ b/cql3/statements/strongly_consistent_modification_statement.hh @@ -37,10 +37,10 @@ public: strongly_consistent_modification_statement(uint32_t bound_terms, schema_ptr schema, broadcast_tables::prepared_update query); virtual future<::shared_ptr> - execute(query_processor& qp, service::query_state& qs, const query_options& options) const override; + execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const override; virtual future<::shared_ptr> - execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override; + execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const override; virtual uint32_t get_bound_terms() const override; diff --git a/cql3/statements/strongly_consistent_select_statement.cc b/cql3/statements/strongly_consistent_select_statement.cc index 0e4f206606..89e504f4a0 100644 --- a/cql3/statements/strongly_consistent_select_statement.cc +++ b/cql3/statements/strongly_consistent_select_statement.cc @@ -94,7 +94,7 @@ evaluate_prepared( } future<::shared_ptr> -strongly_consistent_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const { +strongly_consistent_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const { if (this_shard_id() != 0) { co_return ::make_shared(0, cql3::computed_function_values{}); } diff --git a/cql3/statements/strongly_consistent_select_statement.hh b/cql3/statements/strongly_consistent_select_statement.hh index 1ab9575e3e..71702d7c60 100644 --- a/cql3/statements/strongly_consistent_select_statement.hh +++ b/cql3/statements/strongly_consistent_select_statement.hh @@ -45,7 +45,7 @@ public: std::unique_ptr attrs); virtual future<::shared_ptr> - execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override; + execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional guard) const override; }; } diff --git a/cql3/statements/truncate_statement.cc b/cql3/statements/truncate_statement.cc index d262d973b5..9d723fa8ca 100644 --- a/cql3/statements/truncate_statement.cc +++ b/cql3/statements/truncate_statement.cc @@ -90,7 +90,7 @@ void truncate_statement::validate(query_processor&, const service::client_state& } future<::shared_ptr> -truncate_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const +truncate_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { if (qp.db().find_schema(keyspace(), column_family())->is_view()) { throw exceptions::invalid_request_exception("Cannot TRUNCATE materialized view directly; must truncate base table instead"); diff --git a/cql3/statements/truncate_statement.hh b/cql3/statements/truncate_statement.hh index d01276b16a..6aa4ecbfba 100644 --- a/cql3/statements/truncate_statement.hh +++ b/cql3/statements/truncate_statement.hh @@ -40,7 +40,7 @@ public: virtual void validate(query_processor&, const service::client_state& state) const override; virtual future<::shared_ptr> - execute(query_processor& qp, service::query_state& state, const query_options& options) const override; + execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; private: db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const; }; diff --git a/cql3/statements/use_statement.cc b/cql3/statements/use_statement.cc index 45bcf41529..5c9f27b0ed 100644 --- a/cql3/statements/use_statement.cc +++ b/cql3/statements/use_statement.cc @@ -55,7 +55,7 @@ future<> use_statement::check_access(query_processor& qp, const service::client_ } future<::shared_ptr> -use_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const { +use_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const { state.get_client_state().set_keyspace(qp.db().real_database(), _keyspace); auto result =::make_shared(_keyspace); return make_ready_future<::shared_ptr>(result); diff --git a/cql3/statements/use_statement.hh b/cql3/statements/use_statement.hh index 9809e7163a..14f47e247d 100644 --- a/cql3/statements/use_statement.hh +++ b/cql3/statements/use_statement.hh @@ -33,7 +33,7 @@ public: virtual seastar::future<> check_access(query_processor& qp, const service::client_state& state) const override; virtual seastar::future> - execute(query_processor& qp, service::query_state& state, const query_options& options) const override; + execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional guard) const override; }; } diff --git a/service/query_state.hh b/service/query_state.hh index f454165bad..5164a8f29d 100644 --- a/service/query_state.hh +++ b/service/query_state.hh @@ -13,6 +13,7 @@ #include "service/client_state.hh" #include "tracing/tracing.hh" #include "service_permit.hh" +#include "cql3/cql_statement.hh" namespace qos { class service_level_controller; diff --git a/service/raft/raft_sys_table_storage.cc b/service/raft/raft_sys_table_storage.cc index ebea4b30af..0a8c10b02b 100644 --- a/service/raft/raft_sys_table_storage.cc +++ b/service/raft/raft_sys_table_storage.cc @@ -266,7 +266,7 @@ future raft_sys_table_storage::do_store_log_entries_one_batch(const std: cql3::attributes::none(), _qp.get_cql_stats()); - co_await batch.execute(_qp, _dummy_query_state, batch_options); + co_await batch.execute(_qp, _dummy_query_state, batch_options, std::nullopt); if (idx != entries_size) { co_return idx; diff --git a/table_helper.cc b/table_helper.cc index 7a67a4d280..49fd474977 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -111,7 +111,7 @@ future<> table_helper::insert(cql3::query_processor& qp, service::migration_mana return cache_table_info(qp, mm, qs).then([this, &qp, &qs, opt_maker = std::move(opt_maker)] () mutable { return do_with(opt_maker(), [this, &qp, &qs] (auto& opts) { opts.prepare(_prepared_stmt->bound_names); - return _insert_stmt->execute(qp, qs, opts); + return _insert_stmt->execute(qp, qs, opts, std::nullopt); }); }).discard_result(); } diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index ef0836642e..1729e806da 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -274,7 +274,7 @@ public: auto qs = make_query_state(); auto& lqo = *qo; - return local_qp().execute_prepared_without_checking_exception_message(std::move(prepared), std::move(id), *qs, lqo, true) + return local_qp().execute_prepared_without_checking_exception_message(*qs, std::move(stmt), lqo, std::move(prepared), std::move(id), true) .then([qs, qo = std::move(qo)] (auto msg) { return cql_transport::messages::propagate_exception_as_future(std::move(msg)); }); diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index 553c712855..9e23282613 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -402,7 +402,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT), std::move(values)), cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()), [this, &qp] (auto& batch_options, auto& batch) { - return batch.execute(qp, _dummy_query_state, batch_options).then([] (shared_ptr res) { return now(); }); + return batch.execute(qp, _dummy_query_state, batch_options, std::nullopt).then([] (shared_ptr res) { return now(); }); } ); }); diff --git a/transport/server.cc b/transport/server.cc index 3d81b26aa5..dc8e62d715 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -1129,7 +1129,7 @@ process_execute_internal(service::client_state& client_state, distributedmove_to_shard()) { return process_fn_return_type(dynamic_pointer_cast(msg));