Enable verification of write consistency level guardrails in `modification_statement` and `batch_statement`. Neither guardrail is enabled by default, so as not to disrupt clusters that are currently using any of the CLs for writes. The warning guardrail may seem harmless, as it only adds a warning to the CQL response; however, enabling it can significantly increase network traffic (as a warning message is added to each response) and also decrease throughput due to additional allocations required to prepare the warning. Therefore, both guardrails should be enabled with care. The newly added `writes_per_consistency_level` metric, which is incremented unconditionally, can help decide whether a guardrail can be safely enabled in an existing cluster. This commit adds additional `if` instructions on the critical path. However, based on the `perf_simple_query` benchmark for writes, the difference is marginal (~40 additional instructions, which is a relative difference smaller than 0.001). BEFORE: ``` 291443.35 tps ( 53.3 allocs/op, 16.0 logallocs/op, 14.2 tasks/op, 48067 insns/op, 18885 cycles/op, 0 errors) throughput: mean= 289743.07 standard-deviation=6075.60 median= 291424.69 median-absolute-deviation=1702.56 maximum=292498.27 minimum=261920.06 instructions_per_op: mean= 48072.30 standard-deviation=21.15 median= 48074.49 median-absolute-deviation=12.07 maximum=48119.87 minimum=48019.89 cpu_cycles_per_op: mean= 18884.09 standard-deviation=56.43 median= 18877.33 median-absolute-deviation=14.71 maximum=19155.48 minimum=18821.57 ``` AFTER: ``` 290108.83 tps ( 53.3 allocs/op, 16.0 logallocs/op, 14.2 tasks/op, 48121 insns/op, 18988 cycles/op, 0 errors) throughput: mean= 289105.08 standard-deviation=3626.58 median= 290018.90 median-absolute-deviation=1072.25 maximum=291110.44 minimum=274669.98 instructions_per_op: mean= 48117.57 standard-deviation=18.58 median= 48114.51 median-absolute-deviation=12.08 maximum=48162.18 minimum=48087.18 cpu_cycles_per_op: mean= 18953.43 standard-deviation=28.76 median= 18945.82 median-absolute-deviation=20.84 maximum=19023.93 minimum=18916.46 ``` Fixes: SCYLLADB-259
626 lines
26 KiB
C++
626 lines
26 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <string_view>
|
|
#include <unordered_map>
|
|
|
|
#include <seastar/core/metrics_registration.hh>
|
|
#include <seastar/core/sharded.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
|
|
#include "cql3/prepared_statements_cache.hh"
|
|
#include "cql3/authorized_prepared_statements_cache.hh"
|
|
#include "cql3/statements/prepared_statement.hh"
|
|
#include "cql3/cql_statement.hh"
|
|
#include "cql3/dialect.hh"
|
|
#include "exceptions/exceptions.hh"
|
|
#include "service/migration_listener.hh"
|
|
#include "mutation/timestamp.hh"
|
|
#include "transport/messages/result_message.hh"
|
|
#include "service/client_state.hh"
|
|
#include "service/broadcast_tables/experimental/query_result.hh"
|
|
#include "vector_search/vector_store_client.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/observable.hh"
|
|
#include "service/raft/raft_group0_client.hh"
|
|
#include "types/types.hh"
|
|
#include "db/auth_version.hh"
|
|
#include "db/consistency_level_type.hh"
|
|
#include "db/config.hh"
|
|
#include "utils/enum_option.hh"
|
|
#include "service/storage_proxy_fwd.hh"
|
|
|
|
|
|
namespace lang { class manager; }
|
|
namespace service {
|
|
class migration_manager;
|
|
class query_state;
|
|
class mapreduce_service;
|
|
class raft_group0_client;
|
|
|
|
namespace strong_consistency {
|
|
class coordinator;
|
|
}
|
|
|
|
namespace broadcast_tables {
|
|
struct query;
|
|
}
|
|
}
|
|
|
|
namespace cql3 {
|
|
|
|
namespace statements {
|
|
class batch_statement;
|
|
class schema_altering_statement;
|
|
|
|
namespace raw {
|
|
|
|
class parsed_statement;
|
|
|
|
}
|
|
}
|
|
|
|
class untyped_result_set;
|
|
class untyped_result_set_row;
|
|
|
|
/*!
|
|
* \brief to allow paging, holds
|
|
* internal state, that needs to be passed to the execute statement.
|
|
*
|
|
*/
|
|
struct internal_query_state;
|
|
|
|
class prepared_statement_is_too_big : public std::exception {
|
|
sstring _msg;
|
|
|
|
public:
|
|
static constexpr int max_query_prefix = 100;
|
|
|
|
prepared_statement_is_too_big(const sstring& query_string)
|
|
: _msg(seastar::format("Prepared statement is too big: {}", query_string.substr(0, max_query_prefix)))
|
|
{
|
|
// mark that we clipped the query string
|
|
if (query_string.size() > max_query_prefix) {
|
|
_msg += "...";
|
|
}
|
|
}
|
|
|
|
virtual const char* what() const noexcept override {
|
|
return _msg.c_str();
|
|
}
|
|
};
|
|
|
|
class cql_config;
|
|
class query_options;
|
|
class cql_statement;
|
|
|
|
class query_processor : public seastar::peering_sharded_service<query_processor> {
|
|
public:
|
|
class migration_subscriber;
|
|
struct memory_config {
|
|
size_t prepared_statment_cache_size = 0;
|
|
size_t authorized_prepared_cache_size = 0;
|
|
};
|
|
|
|
private:
|
|
std::unique_ptr<migration_subscriber> _migration_subscriber;
|
|
service::storage_proxy& _proxy;
|
|
data_dictionary::database _db;
|
|
service::migration_notifier& _mnotifier;
|
|
vector_search::vector_store_client& _vector_store_client;
|
|
memory_config _mcfg;
|
|
const cql_config& _cql_config;
|
|
|
|
struct remote;
|
|
std::unique_ptr<remote> _remote;
|
|
|
|
struct stats {
|
|
uint64_t prepare_invocations = 0;
|
|
uint64_t queries_by_cl[size_t(db::consistency_level::MAX_VALUE) + 1] = {};
|
|
} _stats;
|
|
|
|
cql_stats _cql_stats;
|
|
|
|
seastar::metrics::metric_groups _metrics;
|
|
|
|
prepared_statements_cache _prepared_cache;
|
|
authorized_prepared_statements_cache _authorized_prepared_cache;
|
|
|
|
std::function<void(uint32_t)> _auth_prepared_cache_cfg_cb;
|
|
serialized_action _authorized_prepared_cache_config_action;
|
|
utils::observer<uint32_t> _authorized_prepared_cache_update_interval_in_ms_observer;
|
|
utils::observer<uint32_t> _authorized_prepared_cache_validity_in_ms_observer;
|
|
|
|
// A map for prepared statements used internally (which we don't want to mix with user statement, in particular we
|
|
// don't bother with expiration on those.
|
|
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
|
|
|
|
lang::manager& _lang_manager;
|
|
|
|
using cl_option_list = std::vector<enum_option<db::consistency_level_restriction_t>>;
|
|
|
|
/// Efficient bitmask-based set of consistency levels.
|
|
using consistency_level_set = enum_set<super_enum<db::consistency_level,
|
|
db::consistency_level::ANY,
|
|
db::consistency_level::ONE,
|
|
db::consistency_level::TWO,
|
|
db::consistency_level::THREE,
|
|
db::consistency_level::QUORUM,
|
|
db::consistency_level::ALL,
|
|
db::consistency_level::LOCAL_QUORUM,
|
|
db::consistency_level::EACH_QUORUM,
|
|
db::consistency_level::SERIAL,
|
|
db::consistency_level::LOCAL_SERIAL,
|
|
db::consistency_level::LOCAL_ONE>>;
|
|
|
|
|
|
consistency_level_set _write_consistency_levels_warned;
|
|
consistency_level_set _write_consistency_levels_disallowed;
|
|
utils::observer<cl_option_list> _write_consistency_levels_warned_observer;
|
|
utils::observer<cl_option_list> _write_consistency_levels_disallowed_observer;
|
|
|
|
static consistency_level_set to_consistency_level_set(const cl_option_list& levels);
|
|
public:
|
|
static const sstring CQL_VERSION;
|
|
|
|
static prepared_cache_key_type compute_id(
|
|
std::string_view query_string,
|
|
std::string_view keyspace,
|
|
dialect d);
|
|
|
|
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query, dialect d);
|
|
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
|
|
|
|
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
|
|
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
|
|
|
|
~query_processor();
|
|
|
|
void start_remote(service::migration_manager&, service::mapreduce_service&,
|
|
service::storage_service& ss, service::raft_group0_client&,
|
|
service::strong_consistency::coordinator&);
|
|
future<> stop_remote();
|
|
|
|
data_dictionary::database db() {
|
|
return _db;
|
|
}
|
|
|
|
const cql_config& get_cql_config() const {
|
|
return _cql_config;
|
|
}
|
|
|
|
const service::storage_proxy& proxy() const noexcept {
|
|
return _proxy;
|
|
}
|
|
|
|
service::storage_proxy& proxy() {
|
|
return _proxy;
|
|
}
|
|
|
|
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
|
|
acquire_strongly_consistent_coordinator();
|
|
|
|
cql_stats& get_cql_stats() {
|
|
return _cql_stats;
|
|
}
|
|
|
|
lang::manager& lang() { return _lang_manager; }
|
|
|
|
const vector_search::vector_store_client& vector_store_client() const noexcept {
|
|
return _vector_store_client;
|
|
}
|
|
|
|
vector_search::vector_store_client& vector_store_client() noexcept {
|
|
return _vector_store_client;
|
|
}
|
|
|
|
db::auth_version_t auth_version;
|
|
|
|
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
|
|
if (user) {
|
|
auto vp = _authorized_prepared_cache.find(*user, key);
|
|
if (vp) {
|
|
try {
|
|
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
|
|
// corresponds to the last time its value has been read.
|
|
//
|
|
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
|
|
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
|
|
// we need to create space in the prepared statements cache for a new entry.
|
|
//
|
|
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
|
|
// breaking the LRU paradigm of these caches.
|
|
_prepared_cache.touch(key);
|
|
return vp->get()->checked_weak_from_this();
|
|
} catch (seastar::checked_ptr_is_null_exception&) {
|
|
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
|
|
_authorized_prepared_cache.remove(*user, key);
|
|
}
|
|
}
|
|
}
|
|
return statements::prepared_statement::checked_weak_ptr();
|
|
}
|
|
|
|
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
|
|
return _prepared_cache.find(key);
|
|
}
|
|
|
|
inline
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_prepared(
|
|
statements::prepared_statement::checked_weak_ptr statement,
|
|
cql3::prepared_cache_key_type cache_key,
|
|
service::query_state& query_state,
|
|
const query_options& options,
|
|
bool needs_authorization) {
|
|
auto cql_statement = statement->statement;
|
|
return execute_prepared_without_checking_exception_message(
|
|
query_state,
|
|
std::move(cql_statement),
|
|
options,
|
|
std::move(statement),
|
|
std::move(cache_key),
|
|
needs_authorization)
|
|
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
|
}
|
|
|
|
// Like execute_prepared, but is allowed to return exceptions as result_message::exception.
|
|
// The result_message::exception must be explicitly handled.
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_prepared_without_checking_exception_message(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
statements::prepared_statement::checked_weak_ptr prepared,
|
|
cql3::prepared_cache_key_type cache_key,
|
|
bool needs_authorization);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
do_execute_prepared(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
std::optional<service::group0_guard> guard,
|
|
statements::prepared_statement::checked_weak_ptr prepared,
|
|
cql3::prepared_cache_key_type cache_key,
|
|
bool needs_authorization);
|
|
|
|
/// Execute a client statement that was not prepared.
|
|
inline
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_direct(
|
|
const std::string_view& query_string,
|
|
service::query_state& query_state,
|
|
dialect d,
|
|
query_options& options) {
|
|
return execute_direct_without_checking_exception_message(
|
|
query_string,
|
|
query_state,
|
|
d,
|
|
options)
|
|
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
|
}
|
|
|
|
// Like execute_direct, but is allowed to return exceptions as result_message::exception.
|
|
// The result_message::exception must be explicitly handled.
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_direct_without_checking_exception_message(
|
|
const std::string_view& query_string,
|
|
service::query_state& query_state,
|
|
dialect d,
|
|
query_options& options);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
do_execute_direct(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
std::optional<service::group0_guard> guard,
|
|
cql3::cql_warnings_vec warnings);
|
|
|
|
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
|
|
|
|
/*!
|
|
* \brief iterate over all cql results using paging
|
|
*
|
|
* You create a statement with optional parameters and pass
|
|
* a function that goes over the result rows.
|
|
*
|
|
* The passed function would be called for all rows; return future<stop_iteration::yes>
|
|
* to stop iteration.
|
|
*
|
|
* For example:
|
|
return query_internal(
|
|
"SELECT * from system.compaction_history",
|
|
db::consistency_level::ONE,
|
|
{},
|
|
[&history] (const cql3::untyped_result_set::row& row) mutable {
|
|
....
|
|
....
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
});
|
|
|
|
* You can use placeholders in the query, the statement will only be prepared once.
|
|
*
|
|
* query_string - the cql string, can contain placeholders
|
|
* cl - consistency level of the query
|
|
* values - values to be substituted for the placeholders in the query
|
|
* page_size - maximum page size
|
|
* f - a function to be run on each row of the query result,
|
|
* if the function returns stop_iteration::yes the iteration will stop
|
|
* qs - optional query state (default: std::nullopt)
|
|
*
|
|
* \note This function is optimized for convenience, not performance.
|
|
*/
|
|
future<> query_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level cl,
|
|
const data_value_list& values,
|
|
int32_t page_size,
|
|
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f,
|
|
std::optional<service::query_state> qs = std::nullopt);
|
|
|
|
/*
|
|
* \brief iterate over all cql results using paging
|
|
* An overload of query_internal without query parameters
|
|
* using CL = ONE, no timeout, and page size = 1000.
|
|
*
|
|
* query_string - the cql string, can contain placeholders
|
|
* f - a function to be run on each row of the query result,
|
|
* if the function returns stop_iteration::yes the iteration will stop
|
|
*
|
|
* \note This function is optimized for convenience, not performance.
|
|
*/
|
|
future<> query_internal(
|
|
const sstring& query_string,
|
|
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
|
|
|
|
class cache_internal_tag;
|
|
using cache_internal = bool_class<cache_internal_tag>;
|
|
|
|
// NOTICE: Internal queries should be used with care, as they are expected
|
|
// to be used for local tables (e.g. from the `system` keyspace).
|
|
// Data modifications will usually be performed with consistency level ONE
|
|
// and schema changes will not be announced to other nodes.
|
|
// Because of that, changing global schema state (e.g. modifying non-local tables,
|
|
// creating namespaces, etc) is explicitly forbidden via this interface.
|
|
//
|
|
// note: optimized for convenience, not performance.
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level,
|
|
const data_value_list&,
|
|
cache_internal cache);
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level,
|
|
service::query_state& query_state,
|
|
const data_value_list& values,
|
|
cache_internal cache);
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level cl,
|
|
cache_internal cache) {
|
|
return execute_internal(query_string, cl, {}, cache);
|
|
}
|
|
future<::shared_ptr<untyped_result_set>> execute_internal(
|
|
const sstring& query_string,
|
|
db::consistency_level cl,
|
|
service::query_state& query_state,
|
|
cache_internal cache) {
|
|
return execute_internal(query_string, cl, query_state, {}, cache);
|
|
}
|
|
future<::shared_ptr<untyped_result_set>>
|
|
execute_internal(const sstring& query_string, const data_value_list& values, cache_internal cache) {
|
|
return execute_internal(query_string, db::consistency_level::ONE, values, cache);
|
|
}
|
|
future<::shared_ptr<untyped_result_set>>
|
|
execute_internal(const sstring& query_string, cache_internal cache) {
|
|
return execute_internal(query_string, db::consistency_level::ONE, {}, cache);
|
|
}
|
|
|
|
// Obtains mutations from query. For internal usage, most notable
|
|
// use-case is generating data for group0 announce(). Note that this
|
|
// function enables putting multiple CQL queries into a single raft command
|
|
// and vice versa, split mutations from one query into separate commands.
|
|
// It supports write-only queries, read-modified-writes not supported.
|
|
future<utils::chunked_vector<mutation>> get_mutations_internal(
|
|
const sstring query_string,
|
|
service::query_state& query_state,
|
|
api::timestamp_type timestamp,
|
|
std::vector<data_value_or_unset> values);
|
|
|
|
future<::shared_ptr<untyped_result_set>> execute_with_params(
|
|
statements::prepared_statement::checked_weak_ptr p,
|
|
db::consistency_level,
|
|
service::query_state& query_state,
|
|
const data_value_list& values = { });
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>> do_execute_with_params(
|
|
service::query_state& query_state,
|
|
shared_ptr<cql_statement> statement,
|
|
const query_options& options,
|
|
std::optional<service::group0_guard> guard);
|
|
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
|
prepare(sstring query_string, service::query_state& query_state, dialect d);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
|
prepare(sstring query_string, const service::client_state& client_state, dialect d);
|
|
|
|
future<> stop();
|
|
|
|
inline
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_batch(
|
|
::shared_ptr<statements::batch_statement> stmt,
|
|
service::query_state& query_state,
|
|
query_options& options,
|
|
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
|
|
return execute_batch_without_checking_exception_message(
|
|
std::move(stmt),
|
|
query_state,
|
|
options,
|
|
std::move(pending_authorization_entries))
|
|
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
|
}
|
|
|
|
// Like execute_batch, but is allowed to return exceptions as result_message::exception.
|
|
// The result_message::exception must be explicitly handled.
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_batch_without_checking_exception_message(
|
|
::shared_ptr<statements::batch_statement>,
|
|
service::query_state& query_state,
|
|
query_options& options,
|
|
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
|
|
|
|
future<service::broadcast_tables::query_result>
|
|
execute_broadcast_table_query(const service::broadcast_tables::query&);
|
|
|
|
// Splits given `mapreduce_request` and distributes execution of resulting subrequests across a cluster.
|
|
future<query::mapreduce_result>
|
|
mapreduce(query::mapreduce_request, tracing::trace_state_ptr);
|
|
|
|
struct retry_statement_execution_error : public std::exception {};
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options, service::group0_batch& mc);
|
|
future<> announce_schema_statement(const statements::schema_altering_statement&, service::group0_batch& mc);
|
|
|
|
std::unique_ptr<statements::prepared_statement> get_statement(
|
|
const std::string_view& query,
|
|
const service::client_state& client_state,
|
|
dialect d);
|
|
|
|
friend class migration_subscriber;
|
|
|
|
shared_ptr<cql_transport::messages::result_message> bounce_to_shard(unsigned shard, cql3::computed_function_values cached_fn_calls);
|
|
|
|
void update_authorized_prepared_cache_config();
|
|
|
|
void reset_cache();
|
|
|
|
bool topology_global_queue_empty();
|
|
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
|
|
|
|
query_options make_internal_options(
|
|
const statements::prepared_statement::checked_weak_ptr& p,
|
|
const std::vector<data_value_or_unset>& values,
|
|
db::consistency_level,
|
|
int32_t page_size = -1,
|
|
service::node_local_only node_local_only = service::node_local_only::no) const;
|
|
|
|
enum class write_consistency_guardrail_state { NONE, WARN, FAIL };
|
|
inline write_consistency_guardrail_state check_write_consistency_levels_guardrail(db::consistency_level cl) {
|
|
_cql_stats.writes_per_consistency_level[size_t(cl)]++;
|
|
|
|
if (_write_consistency_levels_disallowed.contains(cl)) [[unlikely]] {
|
|
_cql_stats.write_consistency_levels_disallowed_violations++;
|
|
return write_consistency_guardrail_state::FAIL;
|
|
}
|
|
if (_write_consistency_levels_warned.contains(cl)) [[unlikely]] {
|
|
_cql_stats.write_consistency_levels_warned_violations++;
|
|
return write_consistency_guardrail_state::WARN;
|
|
}
|
|
return write_consistency_guardrail_state::NONE;
|
|
}
|
|
|
|
private:
|
|
// Keep the holder until you stop using the `remote` services.
|
|
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options, std::optional<service::group0_guard> guard);
|
|
|
|
/*!
|
|
* \brief created a state object for paging
|
|
*
|
|
* When using paging internally a state object is needed.
|
|
*/
|
|
internal_query_state create_paged_state(
|
|
const sstring& query_string,
|
|
db::consistency_level,
|
|
const data_value_list& values,
|
|
int32_t page_size,
|
|
std::optional<service::query_state> qs = std::nullopt);
|
|
|
|
/*!
|
|
* \brief run a query using paging
|
|
*
|
|
* \note Optimized for convenience, not performance.
|
|
*/
|
|
future<::shared_ptr<untyped_result_set>> execute_paged_internal(internal_query_state& state);
|
|
|
|
/*!
|
|
* \brief iterate over all results using paging, accept a function that returns a future
|
|
*
|
|
* \note Optimized for convenience, not performance.
|
|
*/
|
|
future<> for_each_cql_result(
|
|
cql3::internal_query_state& state,
|
|
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
|
|
|
|
/*!
|
|
* \brief check, based on the state if there are additional results
|
|
* Users of the paging, should not use the internal_query_state directly
|
|
*/
|
|
bool has_more_results(cql3::internal_query_state& state) const;
|
|
|
|
template<typename... Args>
|
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
|
execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
|
|
future<::shared_ptr<cql_transport::messages::result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args);
|
|
|
|
future<::shared_ptr<cql_transport::messages::result_message>> execute_with_guard(
|
|
std::function<future<::shared_ptr<cql_transport::messages::result_message>>(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>)> fn,
|
|
::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);
|
|
};
|
|
|
|
class query_processor::migration_subscriber : public service::migration_listener {
|
|
query_processor* _qp;
|
|
|
|
public:
|
|
migration_subscriber(query_processor* qp);
|
|
|
|
virtual void on_create_keyspace(const sstring& ks_name) override;
|
|
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override;
|
|
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) override;
|
|
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) override;
|
|
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
|
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override;
|
|
|
|
virtual void on_update_keyspace(const sstring& ks_name) override;
|
|
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) override;
|
|
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) override;
|
|
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override;
|
|
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
|
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
|
|
|
|
virtual void on_drop_keyspace(const sstring& ks_name) override;
|
|
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override;
|
|
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override;
|
|
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override;
|
|
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
|
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override;
|
|
|
|
private:
|
|
void remove_invalid_prepared_statements(sstring ks_name, std::optional<sstring> cf_name);
|
|
|
|
bool should_invalidate(
|
|
sstring ks_name,
|
|
std::optional<sstring> cf_name,
|
|
::shared_ptr<cql_statement> statement);
|
|
};
|
|
|
|
}
|