diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 07afe72ee2..ef80c52d2b 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -17,6 +17,9 @@ #include #include +#include "cql3/prepared_statements_cache.hh" +#include "cql3/authorized_prepared_statements_cache.hh" +#include "transport/messages/result_message.hh" #include "service/storage_proxy.hh" #include "service/migration_manager.hh" #include "service/mapreduce_service.hh" @@ -77,7 +80,7 @@ static service::query_state query_state_for_internal_call() { return {service::client_state::for_internal_calls(), empty_service_permit()}; } -query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm) +query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm) : _migration_subscriber{std::make_unique(this)} , _proxy(proxy) , _db(db) @@ -85,8 +88,8 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary: , _vector_store_client(vsc) , _mcfg(mcfg) , _cql_config(cql_cfg) - , _prepared_cache(prep_cache_log, _mcfg.prepared_statment_cache_size) - , _authorized_prepared_cache(std::move(auth_prep_cache_cfg), authorized_prepared_statements_cache_log) + , _prepared_cache(std::make_unique(prep_cache_log, _mcfg.prepared_statment_cache_size)) + , _authorized_prepared_cache(std::make_unique(auth_prep_cache_cfg, authorized_prepared_statements_cache_log)) , _auth_prepared_cache_cfg_cb([this] (uint32_t) { (void) _authorized_prepared_cache_config_action.trigger_later(); }) , _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); }) , _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb)) @@ -353,12 +356,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary: sm::make_gauge( "prepared_cache_size", - [this] { return _prepared_cache.size(); }, + [this] { return _prepared_cache->size(); }, sm::description("A number of entries in the prepared statements cache.")), sm::make_gauge( "prepared_cache_memory_footprint", - [this] { return _prepared_cache.memory_footprint(); }, + [this] { return _prepared_cache->memory_footprint(); }, sm::description("Size (in bytes) of the prepared statements cache.")), sm::make_counter( @@ -449,12 +452,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary: sm::make_gauge( "authorized_prepared_statements_cache_size", - [this] { return _authorized_prepared_cache.size(); }, + [this] { return _authorized_prepared_cache->size(); }, sm::description("Number of entries in the authenticated prepared statements cache.")), sm::make_gauge( "user_prepared_auth_cache_footprint", - [this] { return _authorized_prepared_cache.memory_footprint(); }, + [this] { return _authorized_prepared_cache->memory_footprint(); }, sm::description("Size (in bytes) of the authenticated prepared statements cache.")), sm::make_counter( @@ -554,6 +557,81 @@ query_processor::~query_processor() { } } +statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const std::optional& user, const prepared_cache_key_type& key) { + if (user) { + auto vp = _authorized_prepared_cache->find(*user, key); + if (vp) { + try { + // Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp + // corresponds to the last time its value has been read. + // + // If we don't do this it may turn out that the most recently used prepared statement doesn't have + // the newest last_read timestamp and can get evicted before the not-so-recently-read statement if + // we need to create space in the prepared statements cache for a new entry. + // + // And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache + // breaking the LRU paradigm of these caches. + _prepared_cache->touch(key); + return vp->get()->checked_weak_from_this(); + } catch (seastar::checked_ptr_is_null_exception&) { + // If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well. + _authorized_prepared_cache->remove(*user, key); + } + } + } + return statements::prepared_statement::checked_weak_ptr(); +} + +statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const prepared_cache_key_type& key) { + return _prepared_cache->find(key); +} + +future<::shared_ptr> +query_processor::execute_prepared( + statements::prepared_statement::checked_weak_ptr statement, + cql3::prepared_cache_key_type cache_key, + service::query_state& query_state, + const query_options& options, + bool needs_authorization) { + auto cql_statement = statement->statement; + return execute_prepared_without_checking_exception_message( + query_state, + std::move(cql_statement), + options, + std::move(statement), + std::move(cache_key), + needs_authorization) + .then(cql_transport::messages::propagate_exception_as_future<::shared_ptr>); +} + +future<::shared_ptr> +query_processor::execute_direct( + const std::string_view& query_string, + service::query_state& query_state, + dialect d, + query_options& options) { + return execute_direct_without_checking_exception_message( + query_string, + query_state, + d, + options) + .then(cql_transport::messages::propagate_exception_as_future<::shared_ptr>); +} + +future<::shared_ptr> +query_processor::execute_batch( + ::shared_ptr stmt, + service::query_state& query_state, + query_options& options, + std::unordered_map pending_authorization_entries) { + return execute_batch_without_checking_exception_message( + std::move(stmt), + query_state, + options, + std::move(pending_authorization_entries)) + .then(cql_transport::messages::propagate_exception_as_future<::shared_ptr>); +} + std::pair, gate::holder> query_processor::acquire_strongly_consistent_coordinator() { auto [remote_, holder] = remote(); @@ -577,8 +655,8 @@ future<> query_processor::stop_remote() { future<> query_processor::stop() { co_await _mnotifier.unregister_listener(_migration_subscriber.get()); - co_await _authorized_prepared_cache.stop(); - co_await _prepared_cache.stop(); + co_await _authorized_prepared_cache->stop(); + co_await _prepared_cache->stop(); } future<::shared_ptr> query_processor::execute_with_guard( @@ -697,7 +775,7 @@ query_processor::do_execute_prepared( if (needs_authorization) { co_await statement->check_access(*this, query_state.get_client_state()); try { - co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared)); + co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared)); } catch (...) { log.error("failed to cache the entry: {}", std::current_exception()); } @@ -733,7 +811,7 @@ future<::shared_ptr> query_processor::prepare(sstring query_string, const service::client_state& client_state, cql3::dialect d) { try { auto key = compute_id(query_string, client_state.get_raw_keyspace(), d); - auto prep_entry = co_await _prepared_cache.get_pinned(key, [this, &query_string, &client_state, d] { + auto prep_entry = co_await _prepared_cache->get_pinned(key, [this, &query_string, &client_state, d] { auto prepared = get_statement(query_string, client_state, d); prepared->calculate_metadata_id(); auto bound_terms = prepared->statement->get_bound_terms(); @@ -1069,11 +1147,11 @@ query_processor::execute_batch_without_checking_exception_message( ::shared_ptr batch, service::query_state& query_state, query_options& options, - std::unordered_map pending_authorization_entries) { + std::unordered_map pending_authorization_entries) { auto access_future = co_await coroutine::as_future(batch->check_access(*this, query_state.get_client_state())); co_await coroutine::parallel_for_each(pending_authorization_entries, [this, &query_state] (auto& e) -> future<> { try { - co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), e.first, std::move(e.second)); + co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), e.first, std::move(e.second)); } catch (...) { log.error("failed to cache the entry: {}", std::current_exception()); } @@ -1241,7 +1319,7 @@ void query_processor::migration_subscriber::on_drop_view(const sstring& ks_name, void query_processor::migration_subscriber::remove_invalid_prepared_statements( sstring ks_name, std::optional cf_name) { - _qp->_prepared_cache.remove_if([&] (::shared_ptr stmt) { + _qp->_prepared_cache->remove_if([&] (::shared_ptr stmt) { return this->should_invalidate(ks_name, cf_name, stmt); }); } @@ -1298,13 +1376,13 @@ void query_processor::update_authorized_prepared_cache_config() { std::chrono::duration_cast(prepared_statements_cache::entry_expiry)); cfg.refresh = std::chrono::milliseconds(_db.get_config().permissions_update_interval_in_ms()); - if (!_authorized_prepared_cache.update_config(std::move(cfg))) { + if (!_authorized_prepared_cache->update_config(std::move(cfg))) { log.error("Failed to apply authorized prepared statements cache changes. Please read the documentation of these parameters"); } } void query_processor::reset_cache() { - _authorized_prepared_cache.reset(); + _authorized_prepared_cache->reset(); } } diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index cb26348ee7..3521710abd 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -17,15 +17,16 @@ #include #include -#include "cql3/prepared_statements_cache.hh" -#include "cql3/authorized_prepared_statements_cache.hh" +#include "cql3/prepared_cache_key_type.hh" #include "cql3/statements/prepared_statement.hh" #include "cql3/cql_statement.hh" #include "cql3/dialect.hh" +#include "cql3/query_options.hh" +#include "cql3/stats.hh" #include "exceptions/exceptions.hh" #include "service/migration_listener.hh" #include "mutation/timestamp.hh" -#include "transport/messages/result_message.hh" +#include "transport/messages/result_message_base.hh" #include "service/client_state.hh" #include "service/broadcast_tables/experimental/query_result.hh" #include "vector_search/vector_store_client.hh" @@ -41,6 +42,11 @@ namespace lang { class manager; } +namespace utils { +template +struct loading_cache_config_base; +using loading_cache_config = loading_cache_config_base; +} namespace service { class migration_manager; class query_state; @@ -58,6 +64,9 @@ struct query; namespace cql3 { +class prepared_statements_cache; +class authorized_prepared_statements_cache; + namespace statements { class batch_statement; class schema_altering_statement; @@ -132,8 +141,8 @@ private: seastar::metrics::metric_groups _metrics; - prepared_statements_cache _prepared_cache; - authorized_prepared_statements_cache _authorized_prepared_cache; + std::unique_ptr _prepared_cache; + std::unique_ptr _authorized_prepared_cache; // Tracks the rolling maximum of gross bytes allocated during CQL parsing utils::rolling_max_tracker _parsing_cost_tracker{1000}; @@ -184,7 +193,7 @@ public: static std::vector> parse_statements(std::string_view queries, dialect d); query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, - memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm); + memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm); ~query_processor(); @@ -231,53 +240,17 @@ public: return _vector_store_client; } - statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional& user, const prepared_cache_key_type& key) { - if (user) { - auto vp = _authorized_prepared_cache.find(*user, key); - if (vp) { - try { - // Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp - // corresponds to the last time its value has been read. - // - // If we don't do this it may turn out that the most recently used prepared statement doesn't have - // the newest last_read timestamp and can get evicted before the not-so-recently-read statement if - // we need to create space in the prepared statements cache for a new entry. - // - // And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache - // breaking the LRU paradigm of these caches. - _prepared_cache.touch(key); - return vp->get()->checked_weak_from_this(); - } catch (seastar::checked_ptr_is_null_exception&) { - // If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well. - _authorized_prepared_cache.remove(*user, key); - } - } - } - return statements::prepared_statement::checked_weak_ptr(); - } + statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional& user, const prepared_cache_key_type& key); - statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) { - return _prepared_cache.find(key); - } + statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key); - inline future<::shared_ptr> execute_prepared( statements::prepared_statement::checked_weak_ptr statement, cql3::prepared_cache_key_type cache_key, service::query_state& query_state, const query_options& options, - bool needs_authorization) { - auto cql_statement = statement->statement; - return execute_prepared_without_checking_exception_message( - query_state, - std::move(cql_statement), - options, - std::move(statement), - std::move(cache_key), - needs_authorization) - .then(cql_transport::messages::propagate_exception_as_future<::shared_ptr>); - } + bool needs_authorization); // Like execute_prepared, but is allowed to return exceptions as result_message::exception. // The result_message::exception must be explicitly handled. @@ -301,20 +274,12 @@ public: bool needs_authorization); /// Execute a client statement that was not prepared. - inline future<::shared_ptr> execute_direct( const std::string_view& query_string, service::query_state& query_state, dialect d, - query_options& options) { - return execute_direct_without_checking_exception_message( - query_string, - query_state, - d, - options) - .then(cql_transport::messages::propagate_exception_as_future<::shared_ptr>); - } + query_options& options); // Like execute_direct, but is allowed to return exceptions as result_message::exception. // The result_message::exception must be explicitly handled. @@ -466,20 +431,12 @@ public: future<> stop(); - inline future<::shared_ptr> execute_batch( ::shared_ptr stmt, service::query_state& query_state, query_options& options, - std::unordered_map pending_authorization_entries) { - return execute_batch_without_checking_exception_message( - std::move(stmt), - query_state, - options, - std::move(pending_authorization_entries)) - .then(cql_transport::messages::propagate_exception_as_future<::shared_ptr>); - } + std::unordered_map pending_authorization_entries); // Like execute_batch, but is allowed to return exceptions as result_message::exception. // The result_message::exception must be explicitly handled. @@ -488,7 +445,7 @@ public: ::shared_ptr, service::query_state& query_state, query_options& options, - std::unordered_map pending_authorization_entries); + std::unordered_map pending_authorization_entries); future execute_broadcast_table_query(const service::broadcast_tables::query&); diff --git a/transport/server.cc b/transport/server.cc index 064f71f59e..54b203ea2e 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -1687,7 +1687,7 @@ process_batch_internal(service::client_state& client_state, sharded modifications; std::vector values; - std::unordered_map pending_authorization_entries; + std::unordered_map pending_authorization_entries; modifications.reserve(n.assume_value()); values.reserve(n.assume_value());