mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 19:21:01 +00:00
cql3: Extend the scope of group0_guard during DDL statement execution
Currently we hold group0_guard only during DDL statement's execute() function, but unfortunately some statements access underlying schema state also during check_access() and validate() calls which are called by the query_processor before it calls execute. We need to cover those calls with group0_guard as well and also move retry loop up. This patch does it by introducing new function to cql_statement class take_guard(). Schema altering statements return group0 guard while others do not return any guard. Query processor takes this guard at the beginning of a statement execution and retries if service::group0_concurrent_modification is thrown. The guard is passed to the execute in query_state structure. Fixes: #13942 Message-ID: <ZNsynXayKim2XAFr@scylladb.com>
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "timeout_config.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -48,6 +49,9 @@ public:
|
||||
// CQL statement text
|
||||
seastar::sstring raw_cql_statement;
|
||||
|
||||
// True for statements that needs guard to be taken before the execution
|
||||
bool needs_guard = false;
|
||||
|
||||
explicit cql_statement(timeout_config_selector timeout_selector) : _timeout_config_selector(timeout_selector) {}
|
||||
|
||||
virtual ~cql_statement()
|
||||
@@ -82,7 +86,7 @@ public:
|
||||
* @param options options for this query (consistency, variables, pageSize, ...)
|
||||
*/
|
||||
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0;
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const = 0;
|
||||
|
||||
/**
|
||||
* Execute the statement and return the resulting result or null if there is no result.
|
||||
@@ -94,8 +98,8 @@ public:
|
||||
* @param options options for this query (consistency, variables, pageSize, ...)
|
||||
*/
|
||||
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
return execute(qp, state, options);
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
return execute(qp, state, options, std::move(guard));
|
||||
}
|
||||
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "utils/hashers.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -484,16 +485,55 @@ future<> query_processor::stop() {
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> query_processor::execute_with_guard(
|
||||
std::function<future<::shared_ptr<cql_transport::messages::result_message>>(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>)> fn,
|
||||
::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options) {
|
||||
// execute all statements that need group0 guard on shard0
|
||||
if (this_shard_id() != 0) {
|
||||
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0,
|
||||
std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()));
|
||||
}
|
||||
|
||||
auto [remote_, holder] = remote();
|
||||
size_t retries = remote_.get().mm.get_concurrent_ddl_retries();
|
||||
while (true) {
|
||||
try {
|
||||
auto guard = co_await remote_.get().mm.start_group0_operation();
|
||||
co_return co_await fn(query_state, statement, options, std::move(guard));
|
||||
} catch (const service::group0_concurrent_modification& ex) {
|
||||
log.warn("Failed to execute statement \"{}\" due to guard conflict.{}.",
|
||||
statement->raw_cql_statement, retries ? " Retrying" : " Number of retries exceeded, giving up");
|
||||
if (retries--) {
|
||||
continue;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
future<::shared_ptr<result_message>>
|
||||
query_processor::execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
|
||||
future<::shared_ptr<result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args) {
|
||||
if (!statement->needs_guard) {
|
||||
return (this->*fn)(query_state, std::move(statement), options, std::nullopt, std::forward<Args>(args)...);
|
||||
}
|
||||
static auto exec = [fn] (query_processor& qp, Args... args, service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options, std::optional<service::group0_guard> guard) {
|
||||
return (qp.*fn)(query_state, std::move(statement), options, std::move(guard), std::forward<Args>(args)...);
|
||||
};
|
||||
return execute_with_guard(std::bind_front(exec, std::ref(*this), std::forward<Args>(args)...), std::move(statement), query_state, options);
|
||||
}
|
||||
|
||||
future<::shared_ptr<result_message>>
|
||||
query_processor::execute_direct_without_checking_exception_message(const sstring_view& query_string, service::query_state& query_state, query_options& options) {
|
||||
log.trace("execute_direct: \"{}\"", query_string);
|
||||
tracing::trace(query_state.get_trace_state(), "Parsing a statement");
|
||||
auto p = get_statement(query_string, query_state.get_client_state());
|
||||
auto cql_statement = p->statement;
|
||||
auto statement = p->statement;
|
||||
const auto warnings = std::move(p->warnings);
|
||||
if (cql_statement->get_bound_terms() != options.get_values_count()) {
|
||||
if (statement->get_bound_terms() != options.get_values_count()) {
|
||||
const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}",
|
||||
cql_statement->get_bound_terms(),
|
||||
statement->get_bound_terms(),
|
||||
options.get_values_count());
|
||||
throw exceptions::invalid_request_exception(msg);
|
||||
}
|
||||
@@ -505,8 +545,18 @@ query_processor::execute_direct_without_checking_exception_message(const sstring
|
||||
metrics.regularStatementsExecuted.inc();
|
||||
#endif
|
||||
tracing::trace(query_state.get_trace_state(), "Processing a statement");
|
||||
co_await cql_statement->check_access(*this, query_state.get_client_state());
|
||||
auto m = co_await process_authorized_statement(std::move(cql_statement), query_state, options);
|
||||
return execute_maybe_with_guard(query_state, std::move(statement), options, &query_processor::do_execute_direct, std::move(warnings));
|
||||
}
|
||||
|
||||
future<::shared_ptr<result_message>>
|
||||
query_processor::do_execute_direct(
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard,
|
||||
cql3::cql_warnings_vec warnings) {
|
||||
co_await statement->check_access(*this, query_state.get_client_state());
|
||||
auto m = co_await process_authorized_statement(statement, query_state, options, std::move(guard));
|
||||
for (const auto& w : warnings) {
|
||||
m->add_warning(w);
|
||||
}
|
||||
@@ -515,14 +565,24 @@ query_processor::execute_direct_without_checking_exception_message(const sstring
|
||||
|
||||
future<::shared_ptr<result_message>>
|
||||
query_processor::execute_prepared_without_checking_exception_message(
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options,
|
||||
statements::prepared_statement::checked_weak_ptr prepared,
|
||||
cql3::prepared_cache_key_type cache_key,
|
||||
service::query_state& query_state,
|
||||
const query_options& options,
|
||||
bool needs_authorization) {
|
||||
return execute_maybe_with_guard(query_state, std::move(statement), options, &query_processor::do_execute_prepared, std::move(prepared), std::move(cache_key), needs_authorization);
|
||||
}
|
||||
|
||||
::shared_ptr<cql_statement> statement = prepared->statement;
|
||||
|
||||
future<::shared_ptr<result_message>>
|
||||
query_processor::do_execute_prepared(
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard,
|
||||
statements::prepared_statement::checked_weak_ptr prepared,
|
||||
cql3::prepared_cache_key_type cache_key,
|
||||
bool needs_authorization) {
|
||||
if (needs_authorization) {
|
||||
co_await statement->check_access(*this, query_state.get_client_state());
|
||||
try {
|
||||
@@ -531,19 +591,18 @@ query_processor::execute_prepared_without_checking_exception_message(
|
||||
log.error("failed to cache the entry: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
co_return co_await process_authorized_statement(std::move(statement), query_state, options);
|
||||
co_return co_await process_authorized_statement(std::move(statement), query_state, options, std::move(guard));
|
||||
}
|
||||
|
||||
future<::shared_ptr<result_message>>
|
||||
query_processor::process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options) {
|
||||
query_processor::process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options, std::optional<service::group0_guard> guard) {
|
||||
auto& client_state = query_state.get_client_state();
|
||||
|
||||
++_stats.queries_by_cl[size_t(options.get_consistency())];
|
||||
|
||||
statement->validate(*this, client_state);
|
||||
|
||||
auto msg = co_await statement->execute_without_checking_exception_message(*this, query_state, options);
|
||||
auto msg = co_await statement->execute_without_checking_exception_message(*this, query_state, options, std::move(guard));
|
||||
|
||||
if (msg) {
|
||||
co_return std::move(msg);
|
||||
@@ -749,7 +808,7 @@ query_processor::execute_paged_internal(internal_query_state& state) {
|
||||
state.p->statement->validate(*this, service::client_state::for_internal_calls());
|
||||
auto qs = query_state_for_internal_call();
|
||||
::shared_ptr<cql_transport::messages::result_message> msg =
|
||||
co_await state.p->statement->execute(*this, qs, *state.opts);
|
||||
co_await state.p->statement->execute(*this, qs, *state.opts, std::nullopt);
|
||||
|
||||
class visitor : public result_message::visitor_base {
|
||||
internal_query_state& _state;
|
||||
@@ -823,11 +882,22 @@ query_processor::execute_with_params(
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& values) {
|
||||
auto opts = make_internal_options(p, values, cl);
|
||||
p->statement->validate(*this, service::client_state::for_internal_calls());
|
||||
auto msg = co_await p->statement->execute(*this, query_state, opts);
|
||||
auto statement = p->statement;
|
||||
|
||||
auto msg = co_await execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params);
|
||||
co_return ::make_shared<untyped_result_set>(msg);
|
||||
}
|
||||
|
||||
future<::shared_ptr<result_message>>
|
||||
query_processor::do_execute_with_params(
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options, std::optional<service::group0_guard> guard) {
|
||||
statement->validate(*this, service::client_state::for_internal_calls());
|
||||
co_return co_await statement->execute(*this, query_state, options, std::move(guard));
|
||||
}
|
||||
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
query_processor::execute_batch_without_checking_exception_message(
|
||||
::shared_ptr<statements::batch_statement> batch,
|
||||
@@ -852,7 +922,7 @@ query_processor::execute_batch_without_checking_exception_message(
|
||||
}
|
||||
log.trace("execute_batch({}): {}", batch->get_statements().size(), oss.str());
|
||||
}
|
||||
co_return co_await batch->execute(*this, query_state, options);
|
||||
co_return co_await batch->execute(*this, query_state, options, std::nullopt);
|
||||
}
|
||||
|
||||
future<service::broadcast_tables::query_result>
|
||||
@@ -868,44 +938,31 @@ query_processor::forward(query::forward_request req, tracing::trace_state_ptr tr
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
query_processor::execute_schema_statement(const statements::schema_altering_statement& stmt, service::query_state& state, const query_options& options) {
|
||||
query_processor::execute_schema_statement(const statements::schema_altering_statement& stmt, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) {
|
||||
::shared_ptr<cql_transport::event::schema_change> ce;
|
||||
|
||||
if (this_shard_id() != 0) {
|
||||
// execute all schema altering statements on a shard zero since this is where raft group 0 is
|
||||
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0,
|
||||
std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()));
|
||||
on_internal_error(log, "DDL must be executed on shard 0");
|
||||
}
|
||||
|
||||
if (!guard) {
|
||||
on_internal_error(log, "Guard must be present when executing DDL");
|
||||
}
|
||||
|
||||
auto [remote_, holder] = remote();
|
||||
|
||||
cql3::cql_warnings_vec warnings;
|
||||
|
||||
auto [remote_, holder] = remote();
|
||||
auto& mm = remote_.get().mm;
|
||||
auto retries = mm.get_concurrent_ddl_retries();
|
||||
while (true) {
|
||||
try {
|
||||
auto group0_guard = co_await mm.start_group0_operation();
|
||||
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, guard->write_timestamp());
|
||||
warnings = std::move(cql_warnings);
|
||||
|
||||
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, group0_guard.write_timestamp());
|
||||
warnings = std::move(cql_warnings);
|
||||
|
||||
if (!m.empty()) {
|
||||
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
|
||||
co_await mm.announce(std::move(m), std::move(group0_guard), description);
|
||||
}
|
||||
|
||||
ce = std::move(ret);
|
||||
} catch (const service::group0_concurrent_modification&) {
|
||||
log.warn("Failed to execute DDL statement \"{}\" due to concurrent group 0 modification.{}.",
|
||||
stmt.raw_cql_statement, retries ? " Retrying" : " Number of retries exceeded, giving up");
|
||||
if (retries--) {
|
||||
continue;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
break;
|
||||
if (!m.empty()) {
|
||||
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
|
||||
co_await remote_.get().mm.announce(std::move(m), std::move(*guard), description);
|
||||
}
|
||||
|
||||
ce = std::move(ret);
|
||||
|
||||
// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
|
||||
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
|
||||
::shared_ptr<messages::result_message> result;
|
||||
|
||||
@@ -205,11 +205,13 @@ public:
|
||||
service::query_state& query_state,
|
||||
const query_options& options,
|
||||
bool needs_authorization) {
|
||||
auto cql_statement = statement->statement;
|
||||
return execute_prepared_without_checking_exception_message(
|
||||
query_state,
|
||||
std::move(cql_statement),
|
||||
options,
|
||||
std::move(statement),
|
||||
std::move(cache_key),
|
||||
query_state,
|
||||
options,
|
||||
needs_authorization)
|
||||
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
@@ -218,11 +220,22 @@ public:
|
||||
// The result_message::exception must be explicitly handled.
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_prepared_without_checking_exception_message(
|
||||
statements::prepared_statement::checked_weak_ptr statement,
|
||||
cql3::prepared_cache_key_type cache_key,
|
||||
service::query_state& query_state,
|
||||
const query_options& options,
|
||||
bool needs_authorization);
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options,
|
||||
statements::prepared_statement::checked_weak_ptr prepared,
|
||||
cql3::prepared_cache_key_type cache_key,
|
||||
bool needs_authorization);
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
do_execute_prepared(
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard,
|
||||
statements::prepared_statement::checked_weak_ptr prepared,
|
||||
cql3::prepared_cache_key_type cache_key,
|
||||
bool needs_authorization);
|
||||
|
||||
/// Execute a client statement that was not prepared.
|
||||
inline
|
||||
@@ -246,6 +259,14 @@ public:
|
||||
service::query_state& query_state,
|
||||
query_options& options);
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
do_execute_direct(
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard,
|
||||
cql3::cql_warnings_vec warnings);
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
|
||||
|
||||
/*!
|
||||
@@ -350,6 +371,13 @@ public:
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& = { });
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> do_execute_with_params(
|
||||
service::query_state& query_state,
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard);
|
||||
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
||||
prepare(sstring query_string, service::query_state& query_state);
|
||||
|
||||
@@ -389,8 +417,10 @@ public:
|
||||
future<query::forward_result>
|
||||
forward(query::forward_request, tracing::trace_state_ptr);
|
||||
|
||||
struct retry_statement_execution_error : public std::exception {};
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options);
|
||||
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard);
|
||||
|
||||
future<std::string>
|
||||
execute_thrift_schema_command(
|
||||
@@ -420,7 +450,7 @@ private:
|
||||
int32_t page_size = -1) const;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);
|
||||
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options, std::optional<service::group0_guard> guard);
|
||||
|
||||
/*!
|
||||
* \brief created a state object for paging
|
||||
@@ -455,6 +485,15 @@ private:
|
||||
*/
|
||||
bool has_more_results(cql3::internal_query_state& state) const;
|
||||
|
||||
template<typename... Args>
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args);
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute_with_guard(
|
||||
std::function<future<::shared_ptr<cql_transport::messages::result_message>>(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>)> fn,
|
||||
::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);
|
||||
|
||||
///
|
||||
/// \tparam ResultMsgType type of the returned result message (CQL or Thrift)
|
||||
/// \tparam PreparedKeyGenerator a function that generates the prepared statement cache key for given query and
|
||||
|
||||
@@ -102,9 +102,9 @@ cql3::statements::alter_keyspace_statement::prepare(data_dictionary::database db
|
||||
static logging::logger mylogger("alter_keyspace");
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
std::optional<sstring> warning = check_restricted_replication_strategy(qp, keyspace(), *_attrs);
|
||||
return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
if (warning) {
|
||||
msg->add_warning(*warning);
|
||||
mylogger.warn("{}", *warning);
|
||||
|
||||
@@ -35,7 +35,7 @@ public:
|
||||
void validate(query_processor& qp, const service::client_state& state) const override;
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ public:
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ future<> alter_service_level_statement::check_access(query_processor& qp, const
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
alter_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &) const {
|
||||
const query_options &, std::optional<service::group0_guard> guard) const {
|
||||
qos::service_level& sl = state.get_service_level_controller().get_service_level(_service_level);
|
||||
qos::service_level_options slo = _slo.replace_defaults(sl.slo);
|
||||
return state.get_service_level_controller().alter_distributed_service_level(_service_level, slo).then([] {
|
||||
|
||||
@@ -26,7 +26,7 @@ public:
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -402,10 +402,10 @@ cql3::statements::alter_table_statement::prepare(data_dictionary::database db, c
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
alter_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
alter_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
auto s = validation::validate_column_family(qp.db(), keyspace(), column_family());
|
||||
std::optional<sstring> warning = check_restricted_table_properties(qp, s, keyspace(), column_family(), *_properties);
|
||||
return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
if (warning) {
|
||||
msg->add_warning(*warning);
|
||||
mylogger.warn("{}", *warning);
|
||||
|
||||
@@ -52,7 +52,7 @@ public:
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
|
||||
private:
|
||||
|
||||
@@ -35,7 +35,8 @@ future<> attach_service_level_statement::check_access(query_processor& qp, const
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
attach_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &) const {
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
return state.get_service_level_controller().get_distributed_service_level(_service_level).then([this] (qos::service_levels_info sli) {
|
||||
if (sli.empty()) {
|
||||
throw qos::nonexistant_service_level_exception(_service_level);
|
||||
|
||||
@@ -25,7 +25,7 @@ public:
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -234,13 +234,13 @@ static thread_local inheriting_concrete_execution_stage<
|
||||
api::timestamp_type> batch_stage{"cql3_batch", batch_statement_executor::get()};
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
|
||||
query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
return execute_without_checking_exception_message(qp, state, options)
|
||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
return execute_without_checking_exception_message(qp, state, options, std::move(guard))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_without_checking_exception_message(
|
||||
query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
cql3::util::validate_timestamp(qp.db().get_config(), options, _attrs);
|
||||
return batch_stage(this, seastar::ref(qp), seastar::ref(state),
|
||||
seastar::cref(options), false, options.get_timestamp(state));
|
||||
|
||||
@@ -118,10 +118,10 @@ public:
|
||||
static void verify_batch_size(query_processor& qp, const std::vector<mutation>& mutations);
|
||||
|
||||
virtual future<shared_ptr<cql_transport::messages::result_message>> execute(
|
||||
query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::messages::result_message>> execute_without_checking_exception_message(
|
||||
query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
private:
|
||||
friend class batch_statement_executor;
|
||||
future<shared_ptr<cql_transport::messages::result_message>> do_execute(
|
||||
|
||||
@@ -204,9 +204,9 @@ std::optional<sstring> check_restricted_replication_strategy(
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
create_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
create_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
std::optional<sstring> warning = check_restricted_replication_strategy(qp, keyspace(), *_attrs);
|
||||
return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
if (warning) {
|
||||
msg->add_warning(*warning);
|
||||
mylogger.warn("{}", *warning);
|
||||
|
||||
@@ -71,7 +71,7 @@ public:
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> get_keyspace_metadata(const locator::token_metadata& tm);
|
||||
};
|
||||
|
||||
@@ -44,7 +44,7 @@ public:
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -36,7 +36,8 @@ future<> create_service_level_statement::check_access(query_processor& qp, const
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
create_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &) const {
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
qos::service_level_options slo = _slo.replace_defaults(qos::service_level_options{});
|
||||
return state.get_service_level_controller().add_distributed_service_level(_service_level, slo, _if_not_exists).then([] {
|
||||
using void_result_msg = cql_transport::messages::result_message::void_message;
|
||||
|
||||
@@ -27,7 +27,7 @@ public:
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -489,9 +489,9 @@ std::optional<sstring> check_restricted_table_properties(
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
create_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
create_table_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
std::optional<sstring> warning = check_restricted_table_properties(qp, std::nullopt, keyspace(), column_family(), *_properties);
|
||||
return schema_altering_statement::execute(qp, state, options).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warning = std::move(warning)] (::shared_ptr<messages::result_message> msg) {
|
||||
if (warning) {
|
||||
msg->add_warning(*warning);
|
||||
mylogger.warn("{}", *warning);
|
||||
|
||||
@@ -78,7 +78,7 @@ public:
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
schema_ptr get_cf_meta_data(const data_dictionary::database) const;
|
||||
|
||||
|
||||
@@ -441,7 +441,7 @@ seastar::shared_ptr<const metadata> describe_statement::get_result_metadata() co
|
||||
}
|
||||
|
||||
seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
|
||||
describe_statement::execute(cql3::query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
describe_statement::execute(cql3::query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
auto& client_state = state.get_client_state();
|
||||
|
||||
auto descriptions = co_await describe(qp, client_state);
|
||||
|
||||
@@ -51,7 +51,7 @@ public:
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const override;
|
||||
|
||||
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(cql3::query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
execute(cql3::query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
class cluster_describe_statement : public describe_statement {
|
||||
|
||||
@@ -34,7 +34,8 @@ future<> detach_service_level_statement::check_access(query_processor& qp, const
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
detach_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &) const {
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
return state.get_client_state().get_auth_service()->underlying_role_manager().remove_attribute(_role_name, "service_level").then([] {
|
||||
using void_result_msg = cql_transport::messages::result_message::void_message;
|
||||
using result_msg = cql_transport::messages::result_message;
|
||||
|
||||
@@ -23,7 +23,7 @@ public:
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ public:
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -34,7 +34,8 @@ future<> drop_service_level_statement::check_access(query_processor& qp, const s
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
drop_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &) const {
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
return state.get_service_level_controller().drop_distributed_service_level(_service_level, _if_exists).then([] {
|
||||
using void_result_msg = cql_transport::messages::result_message::void_message;
|
||||
using result_msg = cql_transport::messages::result_message;
|
||||
|
||||
@@ -24,7 +24,7 @@ public:
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ public:
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ std::unique_ptr<cql3::statements::prepared_statement> cql3::statements::grant_st
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::grant_statement::execute(query_processor&, service::query_state& state, const query_options& options) const {
|
||||
cql3::statements::grant_statement::execute(query_processor&, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
auto& auth_service = *state.get_client_state().get_auth_service();
|
||||
|
||||
return auth::grant_permissions(auth_service, _role_name, _permissions, _resource).then([] {
|
||||
|
||||
@@ -26,7 +26,8 @@ public:
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor&
|
||||
, service::query_state&
|
||||
, const query_options&) const override;
|
||||
, const query_options&
|
||||
, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -78,7 +78,8 @@ future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::list_permissions_statement::execute(
|
||||
query_processor& qp,
|
||||
service::query_state& state,
|
||||
const query_options& options) const {
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
static auto make_column = [](sstring name) {
|
||||
return make_lw_shared<column_specification>(
|
||||
auth::meta::AUTH_KS,
|
||||
|
||||
@@ -39,7 +39,7 @@ public:
|
||||
future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state& , const query_options&) const override;
|
||||
execute(query_processor&, service::query_state& , const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ public:
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -39,7 +39,8 @@ future<> list_service_level_attachments_statement::check_access(query_processor&
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
list_service_level_attachments_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &) const {
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
|
||||
static auto make_column = [] (sstring name, const shared_ptr<const abstract_type> type) {
|
||||
return make_lw_shared<column_specification>(
|
||||
|
||||
@@ -25,7 +25,7 @@ public:
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -36,7 +36,8 @@ future<> list_service_level_statement::check_access(query_processor& qp, const s
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
list_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &) const {
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
|
||||
static auto make_column = [] (sstring name, const shared_ptr<const abstract_type> type) {
|
||||
return make_lw_shared<column_specification>(
|
||||
|
||||
@@ -24,7 +24,7 @@ public:
|
||||
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ future<> cql3::statements::list_users_statement::check_access(query_processor& q
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::list_users_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
cql3::statements::list_users_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
static const sstring virtual_table_name("users");
|
||||
|
||||
static const auto make_column_spec = [](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
|
||||
|
||||
@@ -26,7 +26,8 @@ public:
|
||||
future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor&
|
||||
, service::query_state&
|
||||
, const query_options&) const override;
|
||||
, const query_options&
|
||||
, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -250,13 +250,13 @@ static thread_local inheriting_concrete_execution_stage<
|
||||
const query_options&> modify_stage{"cql3_modification", modification_statement_executor::get()};
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options) const {
|
||||
return execute_without_checking_exception_message(qp, qs, options)
|
||||
modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const {
|
||||
modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
cql3::util::validate_timestamp(qp.db().get_config(), options, attrs);
|
||||
return modify_stage(this, seastar::ref(qp), seastar::ref(qs), seastar::cref(options));
|
||||
}
|
||||
|
||||
@@ -229,10 +229,10 @@ public:
|
||||
bool has_only_static_column_conditions() const { return !_has_regular_column_conditions && _has_static_column_conditions; }
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& qs, const query_options& options) const override;
|
||||
execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override;
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
private:
|
||||
future<exceptions::coordinator_result<>>
|
||||
|
||||
@@ -37,7 +37,7 @@ public:
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor&, service::query_state&, const query_options&) const override;
|
||||
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ std::unique_ptr<cql3::statements::prepared_statement> cql3::statements::revoke_s
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::revoke_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
cql3::statements::revoke_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
auto& auth_service = *state.get_client_state().get_auth_service();
|
||||
|
||||
return auth::revoke_permissions(auth_service, _role_name, _permissions, _resource).then([] {
|
||||
|
||||
@@ -26,7 +26,8 @@ public:
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor&
|
||||
, service::query_state&
|
||||
, const query_options&) const override;
|
||||
, const query_options&
|
||||
, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -89,7 +89,8 @@ future<> create_role_statement::check_access(query_processor& qp, const service:
|
||||
future<result_message_ptr>
|
||||
create_role_statement::execute(query_processor&,
|
||||
service::query_state& state,
|
||||
const query_options&) const {
|
||||
const query_options&,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
auth::role_config config;
|
||||
config.is_superuser = *_options.is_superuser;
|
||||
config.can_login = *_options.can_login;
|
||||
@@ -173,7 +174,7 @@ future<> alter_role_statement::check_access(query_processor& qp, const service::
|
||||
}
|
||||
|
||||
future<result_message_ptr>
|
||||
alter_role_statement::execute(query_processor&, service::query_state& state, const query_options&) const {
|
||||
alter_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
|
||||
auth::role_config_update update;
|
||||
update.is_superuser = _options.is_superuser;
|
||||
update.can_login = _options.can_login;
|
||||
@@ -235,7 +236,7 @@ future<> drop_role_statement::check_access(query_processor& qp, const service::c
|
||||
}
|
||||
|
||||
future<result_message_ptr>
|
||||
drop_role_statement::execute(query_processor&, service::query_state& state, const query_options&) const {
|
||||
drop_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
|
||||
auto& as = *state.get_client_state().get_auth_service();
|
||||
|
||||
return auth::drop_role(as, _role).then([] {
|
||||
@@ -286,7 +287,7 @@ future<> list_roles_statement::check_access(query_processor& qp, const service::
|
||||
}
|
||||
|
||||
future<result_message_ptr>
|
||||
list_roles_statement::execute(query_processor&, service::query_state& state, const query_options&) const {
|
||||
list_roles_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
|
||||
static const sstring virtual_table_name("roles");
|
||||
|
||||
static const auto make_column_spec = [](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
|
||||
@@ -402,7 +403,7 @@ future<> grant_role_statement::check_access(query_processor& qp, const service::
|
||||
}
|
||||
|
||||
future<result_message_ptr>
|
||||
grant_role_statement::execute(query_processor&, service::query_state& state, const query_options&) const {
|
||||
grant_role_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
|
||||
auto& as = *state.get_client_state().get_auth_service();
|
||||
|
||||
return as.underlying_role_manager().grant(_grantee, _role).then([] {
|
||||
@@ -432,7 +433,8 @@ future<> revoke_role_statement::check_access(query_processor& qp, const service:
|
||||
future<result_message_ptr> revoke_role_statement::execute(
|
||||
query_processor&,
|
||||
service::query_state& state,
|
||||
const query_options&) const {
|
||||
const query_options&,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
auto& rm = state.get_client_state().get_auth_service()->underlying_role_manager();
|
||||
|
||||
return rm.revoke(_revokee, _role).then([] {
|
||||
|
||||
@@ -27,6 +27,7 @@ schema_altering_statement::schema_altering_statement(timeout_config_selector tim
|
||||
, cql_statement_no_metadata(timeout_selector)
|
||||
, _is_column_family_level{false}
|
||||
{
|
||||
needs_guard = true;
|
||||
}
|
||||
|
||||
schema_altering_statement::schema_altering_statement(cf_name name, timeout_config_selector timeout_selector)
|
||||
@@ -34,6 +35,7 @@ schema_altering_statement::schema_altering_statement(cf_name name, timeout_confi
|
||||
, cql_statement_no_metadata(timeout_selector)
|
||||
, _is_column_family_level{true}
|
||||
{
|
||||
needs_guard = true;
|
||||
}
|
||||
|
||||
future<> schema_altering_statement::grant_permissions_to_creator(const service::client_state&) const {
|
||||
@@ -58,7 +60,7 @@ void schema_altering_statement::prepare_keyspace(const service::client_state& st
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
schema_altering_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
schema_altering_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
bool internal = state.get_client_state().is_internal();
|
||||
if (internal) {
|
||||
auto replication_type = locator::replication_strategy_type::everywhere_topology;
|
||||
@@ -73,7 +75,7 @@ schema_altering_statement::execute(query_processor& qp, service::query_state& st
|
||||
}
|
||||
}
|
||||
|
||||
return qp.execute_schema_statement(*this, state, options).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
|
||||
return qp.execute_schema_statement(*this, state, options, std::move(guard)).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
|
||||
auto permissions_granted_fut = internal
|
||||
? make_ready_future<>()
|
||||
: grant_permissions_to_creator(state.get_client_state());
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
class mutation;
|
||||
|
||||
namespace cql3 {
|
||||
@@ -55,7 +57,7 @@ protected:
|
||||
virtual void prepare_keyspace(const service::client_state& state) override;
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
public:
|
||||
virtual future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const = 0;
|
||||
|
||||
@@ -328,16 +328,18 @@ static thread_local inheriting_concrete_execution_stage<
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
select_statement::execute(query_processor& qp,
|
||||
service::query_state& state,
|
||||
const query_options& options) const
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const
|
||||
{
|
||||
return execute_without_checking_exception_message(qp, state, options)
|
||||
return execute_without_checking_exception_message(qp, state, options, std::move(guard))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
select_statement::execute_without_checking_exception_message(query_processor& qp,
|
||||
service::query_state& state,
|
||||
const query_options& options) const
|
||||
const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const
|
||||
{
|
||||
return select_stage(this, seastar::ref(qp), seastar::ref(state), seastar::cref(options));
|
||||
}
|
||||
|
||||
@@ -116,10 +116,10 @@ public:
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor& qp,
|
||||
service::query_state& state, const query_options& options) const override;
|
||||
service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override;
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute_non_aggregate_unpaged(query_processor& qp,
|
||||
lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, service::query_state& state,
|
||||
|
||||
@@ -45,8 +45,8 @@ strongly_consistent_modification_statement::strongly_consistent_modification_sta
|
||||
{ }
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
strongly_consistent_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options) const {
|
||||
return execute_without_checking_exception_message(qp, qs, options)
|
||||
strongly_consistent_modification_statement::execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
return execute_without_checking_exception_message(qp, qs, options, std::move(guard))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ evaluate_prepared(
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
strongly_consistent_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const {
|
||||
strongly_consistent_modification_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{});
|
||||
}
|
||||
|
||||
@@ -37,10 +37,10 @@ public:
|
||||
strongly_consistent_modification_statement(uint32_t bound_terms, schema_ptr schema, broadcast_tables::prepared_update query);
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& qs, const query_options& options) const override;
|
||||
execute(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override;
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ evaluate_prepared(
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
strongly_consistent_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const {
|
||||
strongly_consistent_select_statement::execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0, cql3::computed_function_values{});
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ public:
|
||||
std::unique_ptr<cql3::attributes> attrs);
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options) const override;
|
||||
execute_without_checking_exception_message(query_processor& qp, service::query_state& qs, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ void truncate_statement::validate(query_processor&, const service::client_state&
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
truncate_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const
|
||||
truncate_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const
|
||||
{
|
||||
if (qp.db().find_schema(keyspace(), column_family())->is_view()) {
|
||||
throw exceptions::invalid_request_exception("Cannot TRUNCATE materialized view directly; must truncate base table instead");
|
||||
|
||||
@@ -40,7 +40,7 @@ public:
|
||||
virtual void validate(query_processor&, const service::client_state& state) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
private:
|
||||
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
|
||||
};
|
||||
|
||||
@@ -55,7 +55,7 @@ future<> use_statement::check_access(query_processor& qp, const service::client_
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
use_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
|
||||
use_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
state.get_client_state().set_keyspace(qp.db().real_database(), _keyspace);
|
||||
auto result =::make_shared<cql_transport::messages::result_message::set_keyspace>(_keyspace);
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(result);
|
||||
|
||||
@@ -33,7 +33,7 @@ public:
|
||||
virtual seastar::future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "service/client_state.hh"
|
||||
#include "tracing/tracing.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "cql3/cql_statement.hh"
|
||||
|
||||
namespace qos {
|
||||
class service_level_controller;
|
||||
|
||||
@@ -266,7 +266,7 @@ future<size_t> raft_sys_table_storage::do_store_log_entries_one_batch(const std:
|
||||
cql3::attributes::none(),
|
||||
_qp.get_cql_stats());
|
||||
|
||||
co_await batch.execute(_qp, _dummy_query_state, batch_options);
|
||||
co_await batch.execute(_qp, _dummy_query_state, batch_options, std::nullopt);
|
||||
|
||||
if (idx != entries_size) {
|
||||
co_return idx;
|
||||
|
||||
@@ -111,7 +111,7 @@ future<> table_helper::insert(cql3::query_processor& qp, service::migration_mana
|
||||
return cache_table_info(qp, mm, qs).then([this, &qp, &qs, opt_maker = std::move(opt_maker)] () mutable {
|
||||
return do_with(opt_maker(), [this, &qp, &qs] (auto& opts) {
|
||||
opts.prepare(_prepared_stmt->bound_names);
|
||||
return _insert_stmt->execute(qp, qs, opts);
|
||||
return _insert_stmt->execute(qp, qs, opts, std::nullopt);
|
||||
});
|
||||
}).discard_result();
|
||||
}
|
||||
|
||||
@@ -274,7 +274,7 @@ public:
|
||||
|
||||
auto qs = make_query_state();
|
||||
auto& lqo = *qo;
|
||||
return local_qp().execute_prepared_without_checking_exception_message(std::move(prepared), std::move(id), *qs, lqo, true)
|
||||
return local_qp().execute_prepared_without_checking_exception_message(*qs, std::move(stmt), lqo, std::move(prepared), std::move(id), true)
|
||||
.then([qs, qo = std::move(qo)] (auto msg) {
|
||||
return cql_transport::messages::propagate_exception_as_future(std::move(msg));
|
||||
});
|
||||
|
||||
@@ -402,7 +402,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp,
|
||||
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT), std::move(values)),
|
||||
cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()),
|
||||
[this, &qp] (auto& batch_options, auto& batch) {
|
||||
return batch.execute(qp, _dummy_query_state, batch_options).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });
|
||||
return batch.execute(qp, _dummy_query_state, batch_options, std::nullopt).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
@@ -1129,7 +1129,7 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
|
||||
}
|
||||
|
||||
tracing::trace(trace_state, "Processing a statement");
|
||||
return qp.local().execute_prepared_without_checking_exception_message(std::move(prepared), std::move(cache_key), query_state, options, needs_authorization)
|
||||
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
|
||||
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version] (auto msg) {
|
||||
if (msg->move_to_shard()) {
|
||||
return process_fn_return_type(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg));
|
||||
|
||||
Reference in New Issue
Block a user