In this series we add support for forwarding strongly consistent CQL requests to suitable replicas, so that clients can issue reads/writes to any node and have the request executed on an appropriate tablet replica (and, for writes, on the Raft leader). We return the same CQL response as what the user would get while sending the request to the correct replica and we perform the same logging/stats updates on the request coordinator as if the coordinator was the appropriate replica. The core mechanism of forwarding a strongly consistent request is sending an RPC containing the user's cql request frame to the appropriate replica and returning back a ready, serialized `cql_transport::response`. We do this in the CQL server - it is most prepared for handling these types and forwarding a request containing a CQL frame allows us to reuse near-top-level methods for CQL request handling in the new RPC handler (such as the general `process`) For sending the RPC, the CQL server needs to obtain the information about who should it forward the request to. This requires knowledge about the tablet raft group members and leader. We obtain this information during the execution of a `cql3/strong_consistency` statement, and we return this information back to the CQL server using the generalized `bounce_to_shard` `response_message`, where we now store the information about either a shard, or a specific replica to which we should forward to. Similarly to `bounce_to_shard`, we need to handle this `result_message` in a loop - a replica may move during statement execution, or the Raft leader can change. We also use it for forwarding strongly consistent writes when we're not a member of the affected tablet raft group - in that case we need to forward the statement twice - once to any replica of the affected tablet, then that replica can find the leader and return this information to the coordinator, which allows the second request to be directed to the leader. This feature also allows passing through exception messages which happened on the target replica while executing the statement. For that, many methods of the `cql_transport::cql_server::connection` for creating error responses needed to be moved to `cql_transport::cql_server`. And for final exception handling on the coordinator, we added additional error info to the RPC response, so that the handling can be performed without having the `result_message::exception` or `exception_ptr` itself. Fixes [SCYLLADB-71](https://scylladb.atlassian.net/browse/SCYLLADB-71) [SCYLLADB-71]: https://scylladb.atlassian.net/browse/SCYLLADB-71?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ Closes scylladb/scylladb#27517 * github.com:scylladb/scylladb: test: add tests for CQL forwarding transport: enable CQL forwarding for strong consistency statements transport: add remote statement preparation for CQL forwarding transport: handle redirect responses in CQL forwarding transport: add exception handling for forwarded CQL requests transport: add basic CQL request forwarding idl: add a representation of client_state for forwarding cql_server: handle query, execute, batch in one case transport: inline process_on_shard in cql_server::process transport: extract process() to cql_server transport: add messaging_service to cql_server transport: add response reconstruction helpers for forwarding transport: generalize the bounce result message for bouncing to other nodes strong consistency: redirect requests to live replicas from the same rack transport: pass foreign_ptr into sleep_until_timeout_passes and move it to cql_server transport: extract the error handling from process_request_one transport: move error response helpers from connection to cql_server
637 lines
26 KiB
C++
637 lines
26 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <string_view>
|
|
#include <unordered_map>
|
|
|
|
#include <seastar/core/metrics_registration.hh>
|
|
#include <seastar/core/sharded.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
|
|
#include "cql3/prepared_statements_cache.hh"
|
|
#include "cql3/authorized_prepared_statements_cache.hh"
|
|
#include "cql3/statements/prepared_statement.hh"
|
|
#include "cql3/cql_statement.hh"
|
|
#include "cql3/dialect.hh"
|
|
#include "exceptions/exceptions.hh"
|
|
#include "service/migration_listener.hh"
|
|
#include "mutation/timestamp.hh"
|
|
#include "transport/messages/result_message.hh"
|
|
#include "service/client_state.hh"
|
|
#include "service/broadcast_tables/experimental/query_result.hh"
|
|
#include "vector_search/vector_store_client.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/observable.hh"
|
|
#include "utils/rolling_max_tracker.hh"
|
|
#include "service/raft/raft_group0_client.hh"
|
|
#include "types/types.hh"
|
|
#include "db/consistency_level_type.hh"
|
|
#include "db/config.hh"
|
|
#include "utils/enum_option.hh"
|
|
#include "service/storage_proxy_fwd.hh"
|
|
|
|
|
|
namespace lang { class manager; }
|
|
namespace service {
|
|
class migration_manager;
|
|
class query_state;
|
|
class mapreduce_service;
|
|
class raft_group0_client;
|
|
|
|
namespace strong_consistency {
|
|
class coordinator;
|
|
}
|
|
|
|
namespace broadcast_tables {
|
|
struct query;
|
|
}
|
|
}
|
|
|
|
namespace cql3 {
|
|
|
|
namespace statements {
|
|
class batch_statement;
|
|
class schema_altering_statement;
|
|
|
|
namespace raw {
|
|
|
|
class parsed_statement;
|
|
|
|
}
|
|
}
|
|
|
|
class untyped_result_set;
|
|
class untyped_result_set_row;
|
|
|
|
/*!
|
|
* \brief to allow paging, holds
|
|
* internal state, that needs to be passed to the execute statement.
|
|
*
|
|
*/
|
|
struct internal_query_state;
|
|
|
|
class prepared_statement_is_too_big : public std::exception {
|
|
sstring _msg;
|
|
|
|
public:
|
|
static constexpr int max_query_prefix = 100;
|
|
|
|
prepared_statement_is_too_big(const sstring& query_string)
|
|
: _msg(seastar::format("Prepared statement is too big: {}", query_string.substr(0, max_query_prefix)))
|
|
{
|
|
// mark that we clipped the query string
|
|
if (query_string.size() > max_query_prefix) {
|
|
_msg += "...";
|
|
}
|
|
}
|
|
|
|
virtual const char* what() const noexcept override {
|
|
return _msg.c_str();
|
|
}
|
|
};
|
|
|
|
class cql_config;
|
|
class query_options;
|
|
class cql_statement;
|
|
|
|
class query_processor : public seastar::peering_sharded_service<query_processor> {
|
|
public:
|
|
class migration_subscriber;
|
|
struct memory_config {
|
|
size_t prepared_statment_cache_size = 0;
|
|
size_t authorized_prepared_cache_size = 0;
|
|
};
|
|
|
|
private:
|
|
std::unique_ptr<migration_subscriber> _migration_subscriber;
|
|
service::storage_proxy& _proxy;
|
|
data_dictionary::database _db;
|
|
service::migration_notifier& _mnotifier;
|
|
vector_search::vector_store_client& _vector_store_client;
|
|
memory_config _mcfg;
|
|
const cql_config& _cql_config;
|
|
|
|
struct remote;
|
|
std::unique_ptr<remote> _remote;
|
|
|
|
struct stats {
|
|
uint64_t prepare_invocations = 0;
|
|
uint64_t queries_by_cl[size_t(db::consistency_level::MAX_VALUE) + 1] = {};
|
|
} _stats;
|
|
|
|
cql_stats _cql_stats;
|
|
|
|
seastar::metrics::metric_groups _metrics;
|
|
|
|
prepared_statements_cache _prepared_cache;
|
|
authorized_prepared_statements_cache _authorized_prepared_cache;
|
|
|
|
// Tracks the rolling maximum of gross bytes allocated during CQL parsing
|
|
utils::rolling_max_tracker _parsing_cost_tracker{1000};
|
|
|
|
std::function<void(uint32_t)> _auth_prepared_cache_cfg_cb;
|
|
serialized_action _authorized_prepared_cache_config_action;
|
|
utils::observer<uint32_t> _authorized_prepared_cache_update_interval_in_ms_observer;
|
|
utils::observer<uint32_t> _authorized_prepared_cache_validity_in_ms_observer;
|
|
|
|
// A map for prepared statements used internally (which we don't want to mix with user statement, in particular we
|
|
// don't bother with expiration on those.
|
|
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
|
|
|
|
lang::manager& _lang_manager;
|
|
|
|
using cl_option_list = std::vector<enum_option<db::consistency_level_restriction_t>>;
|
|
|
|
/// Efficient bitmask-based set of consistency levels.
|
|
using consistency_level_set = enum_set<super_enum<db::consistency_level,
|
|
db::consistency_level::ANY,
|
|
db::consistency_level::ONE,
|
|
db::consistency_level::TWO,
|
|
db::consistency_level::THREE,
|
|
db::consistency_level::QUORUM,
|
|
db::consistency_level::ALL,
|
|
db::consistency_level::LOCAL_QUORUM,
|
|
db::consistency_level::EACH_QUORUM,
|
|
db::consistency_level::SERIAL,
|
|
db::consistency_level::LOCAL_SERIAL,
|
|
db::consistency_level::LOCAL_ONE>>;
|
|
|
|
|
|
consistency_level_set _write_consistency_levels_warned;
|
|
consistency_level_set _write_consistency_levels_disallowed;
|
|
utils::observer<cl_option_list> _write_consistency_levels_warned_observer;
|
|
utils::observer<cl_option_list> _write_consistency_levels_disallowed_observer;
|
|
|
|
static consistency_level_set to_consistency_level_set(const cl_option_list& levels);
|
|
public:
|
|
static const sstring CQL_VERSION;
|
|
|
|
static prepared_cache_key_type compute_id(
|
|
std::string_view query_string,
|
|
std::string_view keyspace,
|
|
dialect d);
|
|
|
|
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query, dialect d);
|
|
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
|
|
|
|
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
|
|
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
|
|
|
|
~query_processor();
|
|
|
|
void start_remote(service::migration_manager&, service::mapreduce_service&,
|
|
service::storage_service& ss, service::raft_group0_client&,
|
|
service::strong_consistency::coordinator&);
|
|
future<> stop_remote();
|
|
|
|
data_dictionary::database db() {
|
|
return _db;
|
|
}
|
|
|
|
const cql_config& get_cql_config() const {
|
|
return _cql_config;
|
|
}
|
|
|
|
const service::storage_proxy& proxy() const noexcept {
|
|
return _proxy;
|
|
}
|
|
|
|
service::storage_proxy& proxy() {
|
|
return _proxy;
|
|
}
|
|
|
|
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
|
|
acquire_strongly_consistent_coordinator();
|
|
|
|
cql_stats& get_cql_stats() {
|
|
return _cql_stats;
|
|
}
|
|
|
|
/// Returns the estimated peak memory cost of CQL parsing.
|
|
size_t parsing_cost_estimate() const noexcept {
|
|
return _parsing_cost_tracker.current_max();
|
|
}
|
|
|
|
lang::manager& lang() { return _lang_manager; }
|
|
|
|
const vector_search::vector_store_client& vector_store_client() const noexcept {
|
|
return _vector_store_client;
|
|
}
|
|
|
|
vector_search::vector_store_client& vector_store_client() noexcept {
|
|
return _vector_store_client;
|
|
}
|
|
|
|
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
|
|
if (user) {
|
|
auto vp = _authorized_prepared_cache.find(*user, key);
|
|
if (vp) {
|
|
try {
|
|
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
|
|
// corresponds to the last time its value has been read.
|
|
//
|
|
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
|
|
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
|
|
// we need to create space in the prepared statements cache for a new entry.
|
|
//
|
|
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
|
|
// breaking the LRU paradigm of these caches.
|
|
_prepared_cache.touch(key);
|
|
return vp->get()->checked_weak_from_this();
|
|
} catch (seastar::checked_ptr_is_null_exception&) {
|
|
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
|
|
_authorized_prepared_cache.remove(*user, key);
|
|
}
|
|
}
|
|
}
|
|
return statements::prepared_statement::checked_weak_ptr();
|
|
}
|
|
|
|
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
|
|
return _prepared_cache.find(key);
|
|
}
|
|
|
|
inline
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_prepared(
|
|
statements::prepared_statement::checked_weak_ptr statement,
|
|
cql3::prepared_cache_key_type cache_key,
|
|
service::query_state& query_state,
|
|
const query_options& options,
|
|
bool needs_authorization) {
|
|
auto cql_statement = statement->statement;
|
|
return execute_prepared_without_checking_exception_message(
|
|
query_state,
|
|
std::move(cql_statement),
|
|
options,
|
|
std::move(statement),
|
|
std::move(cache_key),
|
|
needs_authorization)
|
|
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
|
}
|
|
|
|
// Like execute_prepared, but is allowed to return exceptions as result_message::exception.
|
|
// The result_message::exception must be explicitly handled.
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_prepared_without_checking_exception_message(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
statements::prepared_statement::checked_weak_ptr prepared,
|
|
cql3::prepared_cache_key_type cache_key,
|
|
bool needs_authorization);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
do_execute_prepared(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
std::optional<service::group0_guard> guard,
|
|
statements::prepared_statement::checked_weak_ptr prepared,
|
|
cql3::prepared_cache_key_type cache_key,
|
|
bool needs_authorization);
|
|
|
|
/// Execute a client statement that was not prepared.
|
|
inline
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_direct(
|
|
const std::string_view& query_string,
|
|
service::query_state& query_state,
|
|
dialect d,
|
|
query_options& options) {
|
|
return execute_direct_without_checking_exception_message(
|
|
query_string,
|
|
query_state,
|
|
d,
|
|
options)
|
|
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
|
}
|
|
|
|
// Like execute_direct, but is allowed to return exceptions as result_message::exception.
|
|
// The result_message::exception must be explicitly handled.
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_direct_without_checking_exception_message(
|
|
const std::string_view& query_string,
|
|
service::query_state& query_state,
|
|
dialect d,
|
|
query_options& options);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
do_execute_direct(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
std::optional<service::group0_guard> guard,
|
|
cql3::cql_warnings_vec warnings);
|
|
|
|
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
|
|
|
|
/*!
|
|
* \brief iterate over all cql results using paging
|
|
*
|
|
* You create a statement with optional parameters and pass
|
|
* a function that goes over the result rows.
|
|
*
|
|
* The passed function would be called for all rows; return future<stop_iteration::yes>
|
|
* to stop iteration.
|
|
*
|
|
* For example:
|
|
return query_internal(
|
|
"SELECT * from system.compaction_history",
|
|
db::consistency_level::ONE,
|
|
{},
|
|
[&history] (const cql3::untyped_result_set::row& row) mutable {
|
|
....
|
|
....
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
});
|
|
|
|
* You can use placeholders in the query, the statement will only be prepared once.
|
|
*
|
|
* query_string - the cql string, can contain placeholders
|
|
* cl - consistency level of the query
|
|
* values - values to be substituted for the placeholders in the query
|
|
* page_size - maximum page size
|
|
* f - a function to be run on each row of the query result,
|
|
* if the function returns stop_iteration::yes the iteration will stop
|
|
* qs - optional query state (default: std::nullopt)
|
|
*
|
|
* \note This function is optimized for convenience, not performance.
|
|
*/
|
|
future<> query_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level cl,
|
|
const data_value_list& values,
|
|
int32_t page_size,
|
|
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f,
|
|
std::optional<service::query_state> qs = std::nullopt);
|
|
|
|
/*
|
|
* \brief iterate over all cql results using paging
|
|
* An overload of query_internal without query parameters
|
|
* using CL = ONE, no timeout, and page size = 1000.
|
|
*
|
|
* query_string - the cql string, can contain placeholders
|
|
* f - a function to be run on each row of the query result,
|
|
* if the function returns stop_iteration::yes the iteration will stop
|
|
*
|
|
* \note This function is optimized for convenience, not performance.
|
|
*/
|
|
future<> query_internal(
|
|
const sstring& query_string,
|
|
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
|
|
|
|
class cache_internal_tag;
|
|
using cache_internal = bool_class<cache_internal_tag>;
|
|
|
|
// NOTICE: Internal queries should be used with care, as they are expected
|
|
// to be used for local tables (e.g. from the `system` keyspace).
|
|
// Data modifications will usually be performed with consistency level ONE
|
|
// and schema changes will not be announced to other nodes.
|
|
// Because of that, changing global schema state (e.g. modifying non-local tables,
|
|
// creating namespaces, etc) is explicitly forbidden via this interface.
|
|
//
|
|
// note: optimized for convenience, not performance.
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level,
|
|
const data_value_list&,
|
|
cache_internal cache);
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level,
|
|
service::query_state& query_state,
|
|
const data_value_list& values,
|
|
cache_internal cache);
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level cl,
|
|
cache_internal cache) {
|
|
return execute_internal(query_string, cl, {}, cache);
|
|
}
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level cl,
|
|
service::query_state& query_state,
|
|
cache_internal cache) {
|
|
return execute_internal(query_string, cl, query_state, {}, cache);
|
|
}
|
|
future<::shared_ptr<untyped_result_set>>
|
|
execute_internal(const sstring& query_string, const data_value_list& values, cache_internal cache) {
|
|
return execute_internal(query_string, db::consistency_level::ONE, values, cache);
|
|
}
|
|
future<::shared_ptr<untyped_result_set>>
|
|
execute_internal(const sstring& query_string, cache_internal cache) {
|
|
return execute_internal(query_string, db::consistency_level::ONE, {}, cache);
|
|
}
|
|
|
|
// Obtains mutations from query. For internal usage, most notable
|
|
// use-case is generating data for group0 announce(). Note that this
|
|
// function enables putting multiple CQL queries into a single raft command
|
|
// and vice versa, split mutations from one query into separate commands.
|
|
// It supports write-only queries, read-modified-writes not supported.
|
|
future<utils::chunked_vector<mutation>> get_mutations_internal(
|
|
const sstring query_string,
|
|
service::query_state& query_state,
|
|
api::timestamp_type timestamp,
|
|
std::vector<data_value_or_unset> values);
|
|
|
|
future<::shared_ptr<untyped_result_set>> execute_with_params(
|
|
statements::prepared_statement::checked_weak_ptr p,
|
|
db::consistency_level,
|
|
service::query_state& query_state,
|
|
const data_value_list& values = { });
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>> do_execute_with_params(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
std::optional<service::group0_guard> guard);
|
|
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
|
prepare(sstring query_string, service::query_state& query_state, dialect d);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
|
prepare(sstring query_string, const service::client_state& client_state, dialect d);
|
|
|
|
future<> stop();
|
|
|
|
inline
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_batch(
|
|
::shared_ptr<statements::batch_statement> stmt,
|
|
service::query_state& query_state,
|
|
query_options& options,
|
|
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
|
|
return execute_batch_without_checking_exception_message(
|
|
std::move(stmt),
|
|
query_state,
|
|
options,
|
|
std::move(pending_authorization_entries))
|
|
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
|
}
|
|
|
|
// Like execute_batch, but is allowed to return exceptions as result_message::exception.
|
|
// The result_message::exception must be explicitly handled.
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_batch_without_checking_exception_message(
|
|
::shared_ptr<statements::batch_statement>,
|
|
service::query_state& query_state,
|
|
query_options& options,
|
|
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
|
|
|
|
future<service::broadcast_tables::query_result>
|
|
execute_broadcast_table_query(const service::broadcast_tables::query&);
|
|
|
|
// Splits given `mapreduce_request` and distributes execution of resulting subrequests across a cluster.
|
|
future<query::mapreduce_result>
|
|
mapreduce(query::mapreduce_request, tracing::trace_state_ptr);
|
|
|
|
struct retry_statement_execution_error : public std::exception {};
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options, service::group0_batch& mc);
|
|
future<> announce_schema_statement(const statements::schema_altering_statement&, service::group0_batch& mc);
|
|
|
|
std::unique_ptr<statements::prepared_statement> get_statement(
|
|
const std::string_view& query,
|
|
const service::client_state& client_state,
|
|
dialect d);
|
|
|
|
friend class migration_subscriber;
|
|
|
|
shared_ptr<cql_transport::messages::result_message> bounce_to_shard(unsigned shard, cql3::computed_function_values cached_fn_calls, bool track = true);
|
|
shared_ptr<cql_transport::messages::result_message> bounce_to_node(
|
|
locator::tablet_replica replica,
|
|
cql3::computed_function_values cached_fn_calls,
|
|
seastar::lowres_clock::time_point timeout,
|
|
bool is_write);
|
|
|
|
void update_authorized_prepared_cache_config();
|
|
|
|
void reset_cache();
|
|
|
|
bool topology_global_queue_empty();
|
|
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
|
|
|
|
query_options make_internal_options(
|
|
const statements::prepared_statement::checked_weak_ptr& p,
|
|
const std::vector<data_value_or_unset>& values,
|
|
db::consistency_level,
|
|
int32_t page_size = -1,
|
|
service::node_local_only node_local_only = service::node_local_only::no) const;
|
|
|
|
enum class write_consistency_guardrail_state { NONE, WARN, FAIL };
|
|
inline write_consistency_guardrail_state check_write_consistency_levels_guardrail(db::consistency_level cl) {
|
|
_cql_stats.writes_per_consistency_level[size_t(cl)]++;
|
|
|
|
if (_write_consistency_levels_disallowed.contains(cl)) [[unlikely]] {
|
|
_cql_stats.write_consistency_levels_disallowed_violations++;
|
|
return write_consistency_guardrail_state::FAIL;
|
|
}
|
|
if (_write_consistency_levels_warned.contains(cl)) [[unlikely]] {
|
|
_cql_stats.write_consistency_levels_warned_violations++;
|
|
return write_consistency_guardrail_state::WARN;
|
|
}
|
|
return write_consistency_guardrail_state::NONE;
|
|
}
|
|
|
|
private:
|
|
// Keep the holder until you stop using the `remote` services.
|
|
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options, std::optional<service::group0_guard> guard);
|
|
|
|
/*!
|
|
* \brief created a state object for paging
|
|
*
|
|
* When using paging internally a state object is needed.
|
|
*/
|
|
internal_query_state create_paged_state(
|
|
const sstring& query_string,
|
|
db::consistency_level,
|
|
const data_value_list& values,
|
|
int32_t page_size,
|
|
std::optional<service::query_state> qs = std::nullopt);
|
|
|
|
/*!
|
|
* \brief run a query using paging
|
|
*
|
|
* \note Optimized for convenience, not performance.
|
|
*/
|
|
future<::shared_ptr<untyped_result_set>> execute_paged_internal(internal_query_state& state);
|
|
|
|
/*!
|
|
* \brief iterate over all results using paging, accept a function that returns a future
|
|
*
|
|
* \note Optimized for convenience, not performance.
|
|
*/
|
|
future<> for_each_cql_result(
|
|
cql3::internal_query_state& state,
|
|
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
|
|
|
|
/*!
|
|
* \brief check, based on the state if there are additional results
|
|
* Users of the paging, should not use the internal_query_state directly
|
|
*/
|
|
bool has_more_results(cql3::internal_query_state& state) const;
|
|
|
|
template<typename... Args>
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
|
|
future<::shared_ptr<cql_transport::messages::result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>> execute_with_guard(
|
|
std::function<future<::shared_ptr<cql_transport::messages::result_message>>(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>)> fn,
|
|
::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);
|
|
};
|
|
|
|
class query_processor::migration_subscriber : public service::migration_listener {
|
|
query_processor* _qp;
|
|
|
|
public:
|
|
migration_subscriber(query_processor* qp);
|
|
|
|
virtual void on_create_keyspace(const sstring& ks_name) override;
|
|
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override;
|
|
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) override;
|
|
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) override;
|
|
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
|
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override;
|
|
|
|
virtual void on_update_keyspace(const sstring& ks_name) override;
|
|
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) override;
|
|
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) override;
|
|
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override;
|
|
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
|
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
|
|
|
|
virtual void on_drop_keyspace(const sstring& ks_name) override;
|
|
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override;
|
|
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override;
|
|
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override;
|
|
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
|
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override;
|
|
|
|
private:
|
|
void remove_invalid_prepared_statements(sstring ks_name, std::optional<sstring> cf_name);
|
|
|
|
bool should_invalidate(
|
|
sstring ks_name,
|
|
std::optional<sstring> cf_name,
|
|
::shared_ptr<cql_statement> statement);
|
|
};
|
|
|
|
}
|