Files
scylladb/cql3/query_processor.hh
Michael Litvak 5a5c7c6241 strong_consistency: wait for raft servers to start in create table
When creating a strongly consistent table, wait for the table's raft
servers to start and be ready to serve queries before completing the
operation. We want the create table operation to absorb the delay of
starting the raft groups instead of the first queries.

The create table coordinator commits and applies the schema statement,
then it waits for all hosts that have a tablet replica to create and
start the raft groups for the table's tablets. It does this by sending
an RPC to all the relevant hosts that executes a group0 barrier, in
order to ensure the table and raft groups are created, then waits for
all raft groups on the host to finish starting and be ready.

Fixes SCYLLADB-807
2026-05-13 08:43:24 +02:00

644 lines
27 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.1 and Apache-2.0)
*/
#pragma once
#include <string_view>
#include <unordered_map>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include "cql3/prepared_statements_cache.hh"
#include "cql3/authorized_prepared_statements_cache.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/cql_statement.hh"
#include "cql3/dialect.hh"
#include "exceptions/exceptions.hh"
#include "service/migration_listener.hh"
#include "mutation/timestamp.hh"
#include "transport/messages/result_message.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "vector_search/vector_store_client.hh"
#include "utils/assert.hh"
#include "utils/observable.hh"
#include "utils/rolling_max_tracker.hh"
#include "service/raft/raft_group0_client.hh"
#include "types/types.hh"
#include "db/consistency_level_type.hh"
#include "db/config.hh"
#include "utils/enum_option.hh"
#include "service/storage_proxy_fwd.hh"
namespace lang { class manager; }
namespace service {
class migration_manager;
class query_state;
class mapreduce_service;
class raft_group0_client;
namespace strong_consistency {
class coordinator;
}
namespace broadcast_tables {
struct query;
}
}
namespace cql3 {
namespace statements {
class batch_statement;
class schema_altering_statement;
namespace raw {
class parsed_statement;
}
}
class untyped_result_set;
class untyped_result_set_row;
/*!
* \brief to allow paging, holds
* internal state, that needs to be passed to the execute statement.
*
*/
struct internal_query_state;
class prepared_statement_is_too_big : public std::exception {
sstring _msg;
public:
static constexpr int max_query_prefix = 100;
prepared_statement_is_too_big(const sstring& query_string)
: _msg(seastar::format("Prepared statement is too big: {}", query_string.substr(0, max_query_prefix)))
{
// mark that we clipped the query string
if (query_string.size() > max_query_prefix) {
_msg += "...";
}
}
virtual const char* what() const noexcept override {
return _msg.c_str();
}
};
class cql_config;
class query_options;
class cql_statement;
class query_processor : public seastar::peering_sharded_service<query_processor> {
public:
class migration_subscriber;
struct memory_config {
size_t prepared_statment_cache_size = 0;
size_t authorized_prepared_cache_size = 0;
};
private:
std::unique_ptr<migration_subscriber> _migration_subscriber;
service::storage_proxy& _proxy;
data_dictionary::database _db;
service::migration_notifier& _mnotifier;
vector_search::vector_store_client& _vector_store_client;
memory_config _mcfg;
const cql_config& _cql_config;
struct remote;
std::unique_ptr<remote> _remote;
struct stats {
uint64_t prepare_invocations = 0;
uint64_t queries_by_cl[size_t(db::consistency_level::MAX_VALUE) + 1] = {};
} _stats;
cql_stats _cql_stats;
seastar::metrics::metric_groups _metrics;
prepared_statements_cache _prepared_cache;
authorized_prepared_statements_cache _authorized_prepared_cache;
// Tracks the rolling maximum of gross bytes allocated during CQL parsing
utils::rolling_max_tracker _parsing_cost_tracker{1000};
std::function<void(uint32_t)> _auth_prepared_cache_cfg_cb;
serialized_action _authorized_prepared_cache_config_action;
utils::observer<uint32_t> _authorized_prepared_cache_update_interval_in_ms_observer;
utils::observer<uint32_t> _authorized_prepared_cache_validity_in_ms_observer;
// A map for prepared statements used internally (which we don't want to mix with user statement, in particular we
// don't bother with expiration on those.
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
lang::manager& _lang_manager;
using cl_option_list = std::vector<enum_option<db::consistency_level_restriction_t>>;
/// Efficient bitmask-based set of consistency levels.
using consistency_level_set = enum_set<super_enum<db::consistency_level,
db::consistency_level::ANY,
db::consistency_level::ONE,
db::consistency_level::TWO,
db::consistency_level::THREE,
db::consistency_level::QUORUM,
db::consistency_level::ALL,
db::consistency_level::LOCAL_QUORUM,
db::consistency_level::EACH_QUORUM,
db::consistency_level::SERIAL,
db::consistency_level::LOCAL_SERIAL,
db::consistency_level::LOCAL_ONE>>;
consistency_level_set _write_consistency_levels_warned;
consistency_level_set _write_consistency_levels_disallowed;
utils::observer<cl_option_list> _write_consistency_levels_warned_observer;
utils::observer<cl_option_list> _write_consistency_levels_disallowed_observer;
static consistency_level_set to_consistency_level_set(const cl_option_list& levels);
public:
static const sstring CQL_VERSION;
static prepared_cache_key_type compute_id(
std::string_view query_string,
std::string_view keyspace,
dialect d);
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query, dialect d);
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
~query_processor();
void start_remote(service::migration_manager&, service::mapreduce_service&,
service::storage_service& ss, service::raft_group0_client&,
service::strong_consistency::coordinator&);
future<> stop_remote();
data_dictionary::database db() {
return _db;
}
const cql_config& get_cql_config() const {
return _cql_config;
}
const service::storage_proxy& proxy() const noexcept {
return _proxy;
}
service::storage_proxy& proxy() {
return _proxy;
}
service::storage_service& storage_service();
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
acquire_strongly_consistent_coordinator();
cql_stats& get_cql_stats() {
return _cql_stats;
}
/// Returns the estimated peak memory cost of CQL parsing.
size_t parsing_cost_estimate() const noexcept {
return _parsing_cost_tracker.current_max();
}
lang::manager& lang() { return _lang_manager; }
const vector_search::vector_store_client& vector_store_client() const noexcept {
return _vector_store_client;
}
vector_search::vector_store_client& vector_store_client() noexcept {
return _vector_store_client;
}
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache.find(*user, key);
if (vp) {
try {
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
// corresponds to the last time its value has been read.
//
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
// we need to create space in the prepared statements cache for a new entry.
//
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
// breaking the LRU paradigm of these caches.
_prepared_cache.touch(key);
return vp->get()->checked_weak_from_this();
} catch (seastar::checked_ptr_is_null_exception&) {
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
_authorized_prepared_cache.remove(*user, key);
}
}
}
return statements::prepared_statement::checked_weak_ptr();
}
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
return _prepared_cache.find(key);
}
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared(
statements::prepared_statement::checked_weak_ptr statement,
cql3::prepared_cache_key_type cache_key,
service::query_state& query_state,
const query_options& options,
bool needs_authorization) {
auto cql_statement = statement->statement;
return execute_prepared_without_checking_exception_message(
query_state,
std::move(cql_statement),
options,
std::move(statement),
std::move(cache_key),
needs_authorization)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
// Like execute_prepared, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared_without_checking_exception_message(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
statements::prepared_statement::checked_weak_ptr prepared,
cql3::prepared_cache_key_type cache_key,
bool needs_authorization);
future<::shared_ptr<cql_transport::messages::result_message>>
do_execute_prepared(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard,
statements::prepared_statement::checked_weak_ptr prepared,
cql3::prepared_cache_key_type cache_key,
bool needs_authorization);
/// Execute a client statement that was not prepared.
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_direct(
const std::string_view& query_string,
service::query_state& query_state,
dialect d,
query_options& options) {
return execute_direct_without_checking_exception_message(
query_string,
query_state,
d,
options)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
// Like execute_direct, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
future<::shared_ptr<cql_transport::messages::result_message>>
execute_direct_without_checking_exception_message(
const std::string_view& query_string,
service::query_state& query_state,
dialect d,
query_options& options);
future<::shared_ptr<cql_transport::messages::result_message>>
do_execute_direct(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard,
cql3::cql_warnings_vec warnings);
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
/*!
* \brief iterate over all cql results using paging
*
* You create a statement with optional parameters and pass
* a function that goes over the result rows.
*
* The passed function would be called for all rows; return future<stop_iteration::yes>
* to stop iteration.
*
* For example:
return query_internal(
"SELECT * from system.compaction_history",
db::consistency_level::ONE,
{},
[&history] (const cql3::untyped_result_set::row& row) mutable {
....
....
return make_ready_future<stop_iteration>(stop_iteration::no);
});
* You can use placeholders in the query, the statement will only be prepared once.
*
* query_string - the cql string, can contain placeholders
* cl - consistency level of the query
* values - values to be substituted for the placeholders in the query
* page_size - maximum page size
* f - a function to be run on each row of the query result,
* if the function returns stop_iteration::yes the iteration will stop
* qs - optional query state (default: std::nullopt)
*
* \note This function is optimized for convenience, not performance.
*/
future<> query_internal(
const sstring& query_string,
db::consistency_level cl,
const data_value_list& values,
int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f,
std::optional<service::query_state> qs = std::nullopt);
/*
* \brief iterate over all cql results using paging
* An overload of query_internal without query parameters
* using CL = ONE, no timeout, and page size = 1000.
*
* query_string - the cql string, can contain placeholders
* f - a function to be run on each row of the query result,
* if the function returns stop_iteration::yes the iteration will stop
*
* \note This function is optimized for convenience, not performance.
*/
future<> query_internal(
const sstring& query_string,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
class cache_internal_tag;
using cache_internal = bool_class<cache_internal_tag>;
// NOTICE: Internal queries should be used with care, as they are expected
// to be used for local tables (e.g. from the `system` keyspace).
// Data modifications will usually be performed with consistency level ONE
// and schema changes will not be announced to other nodes.
// Because of that, changing global schema state (e.g. modifying non-local tables,
// creating namespaces, etc) is explicitly forbidden via this interface.
//
// note: optimized for convenience, not performance.
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level,
const data_value_list&,
cache_internal cache);
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level,
service::query_state& query_state,
const data_value_list& values,
cache_internal cache);
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level cl,
cache_internal cache) {
return execute_internal(query_string, cl, {}, cache);
}
future<::shared_ptr<untyped_result_set>> execute_internal(
const sstring& query_string,
db::consistency_level cl,
service::query_state& query_state,
cache_internal cache) {
return execute_internal(query_string, cl, query_state, {}, cache);
}
future<::shared_ptr<untyped_result_set>>
execute_internal(const sstring& query_string, const data_value_list& values, cache_internal cache) {
return execute_internal(query_string, db::consistency_level::ONE, values, cache);
}
future<::shared_ptr<untyped_result_set>>
execute_internal(const sstring& query_string, cache_internal cache) {
return execute_internal(query_string, db::consistency_level::ONE, {}, cache);
}
// Obtains mutations from query. For internal usage, most notable
// use-case is generating data for group0 announce(). Note that this
// function enables putting multiple CQL queries into a single raft command
// and vice versa, split mutations from one query into separate commands.
// It supports write-only queries, read-modified-writes not supported.
future<utils::chunked_vector<mutation>> get_mutations_internal(
const sstring query_string,
service::query_state& query_state,
api::timestamp_type timestamp,
std::vector<data_value_or_unset> values);
future<::shared_ptr<untyped_result_set>> execute_with_params(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level,
service::query_state& query_state,
const data_value_list& values = { });
future<::shared_ptr<cql_transport::messages::result_message>> do_execute_with_params(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard);
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
prepare(sstring query_string, service::query_state& query_state, dialect d);
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
prepare(sstring query_string, const service::client_state& client_state, dialect d);
future<> stop();
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_batch(
::shared_ptr<statements::batch_statement> stmt,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
return execute_batch_without_checking_exception_message(
std::move(stmt),
query_state,
options,
std::move(pending_authorization_entries))
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
// Like execute_batch, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
future<::shared_ptr<cql_transport::messages::result_message>>
execute_batch_without_checking_exception_message(
::shared_ptr<statements::batch_statement>,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
future<service::broadcast_tables::query_result>
execute_broadcast_table_query(const service::broadcast_tables::query&);
// Splits given `mapreduce_request` and distributes execution of resulting subrequests across a cluster.
future<query::mapreduce_result>
mapreduce(query::mapreduce_request, tracing::trace_state_ptr);
struct retry_statement_execution_error : public std::exception {};
future<::shared_ptr<cql_transport::messages::result_message>>
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options, service::group0_batch& mc);
future<> announce_schema_statement(const statements::schema_altering_statement&, service::group0_batch& mc);
// Sends an RPC to every host that holds a tablet replica of the given table, asking it to wait
// until the raft groups for those tablets are started and ready to serve queries.
// For the local node, waits directly without an RPC.
future<> wait_for_table_raft_groups_on_all_hosts(table_id table, lowres_clock::time_point timeout);
std::unique_ptr<statements::prepared_statement> get_statement(
const std::string_view& query,
const service::client_state& client_state,
dialect d);
friend class migration_subscriber;
shared_ptr<cql_transport::messages::result_message> bounce_to_shard(unsigned shard, cql3::computed_function_values cached_fn_calls, bool track = true);
shared_ptr<cql_transport::messages::result_message> bounce_to_node(
locator::tablet_replica replica,
cql3::computed_function_values cached_fn_calls,
seastar::lowres_clock::time_point timeout,
bool is_write);
void update_authorized_prepared_cache_config();
void reset_cache();
bool topology_global_queue_empty();
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
query_options make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,
const std::vector<data_value_or_unset>& values,
db::consistency_level,
int32_t page_size = -1,
service::node_local_only node_local_only = service::node_local_only::no) const;
enum class write_consistency_guardrail_state { NONE, WARN, FAIL };
inline write_consistency_guardrail_state check_write_consistency_levels_guardrail(db::consistency_level cl) {
_cql_stats.writes_per_consistency_level[size_t(cl)]++;
if (_write_consistency_levels_disallowed.contains(cl)) [[unlikely]] {
_cql_stats.write_consistency_levels_disallowed_violations++;
return write_consistency_guardrail_state::FAIL;
}
if (_write_consistency_levels_warned.contains(cl)) [[unlikely]] {
_cql_stats.write_consistency_levels_warned_violations++;
return write_consistency_guardrail_state::WARN;
}
return write_consistency_guardrail_state::NONE;
}
private:
// Keep the holder until you stop using the `remote` services.
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
future<::shared_ptr<cql_transport::messages::result_message>>
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options, std::optional<service::group0_guard> guard);
/*!
* \brief created a state object for paging
*
* When using paging internally a state object is needed.
*/
internal_query_state create_paged_state(
const sstring& query_string,
db::consistency_level,
const data_value_list& values,
int32_t page_size,
std::optional<service::query_state> qs = std::nullopt);
/*!
* \brief run a query using paging
*
* \note Optimized for convenience, not performance.
*/
future<::shared_ptr<untyped_result_set>> execute_paged_internal(internal_query_state& state);
/*!
* \brief iterate over all results using paging, accept a function that returns a future
*
* \note Optimized for convenience, not performance.
*/
future<> for_each_cql_result(
cql3::internal_query_state& state,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
/*!
* \brief check, based on the state if there are additional results
* Users of the paging, should not use the internal_query_state directly
*/
bool has_more_results(cql3::internal_query_state& state) const;
template<typename... Args>
future<::shared_ptr<cql_transport::messages::result_message>>
execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
future<::shared_ptr<cql_transport::messages::result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args);
future<::shared_ptr<cql_transport::messages::result_message>> execute_with_guard(
std::function<future<::shared_ptr<cql_transport::messages::result_message>>(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>)> fn,
::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);
};
class query_processor::migration_subscriber : public service::migration_listener {
query_processor* _qp;
public:
migration_subscriber(query_processor* qp);
virtual void on_create_keyspace(const sstring& ks_name) override;
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override;
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) override;
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) override;
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override;
virtual void on_update_keyspace(const sstring& ks_name) override;
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) override;
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) override;
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override;
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
virtual void on_drop_keyspace(const sstring& ks_name) override;
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override;
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override;
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override;
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override;
private:
void remove_invalid_prepared_statements(sstring ks_name, std::optional<sstring> cf_name);
bool should_invalidate(
sstring ks_name,
std::optional<sstring> cf_name,
::shared_ptr<cql_statement> statement);
};
}