cql3: break loading_cache include chain from query_processor.hh

utils/loading_cache.hh is an expensive template header that costs
~2,494 seconds of aggregate CPU time across 133 files that include it.
88 of those files include it only transitively via query_processor.hh
through the chain: query_processor.hh -> prepared_statements_cache.hh
-> loading_cache.hh, costing ~1,690s of template instantiation.

Break the chain by:
- Replacing #include of prepared_statements_cache.hh and
  authorized_prepared_statements_cache.hh in query_processor.hh with
  forward declarations and the lightweight prepared_cache_key_type.hh
- Replacing #include of result_message.hh with result_message_base.hh
  (which doesn't pull in prepared_statements_cache.hh)
- Changing prepared_statements_cache and authorized_prepared_statements_cache
  members to std::unique_ptr (PImpl) since forward-declared types
  cannot be held by value
- Moving get_prepared(), execute_prepared(), execute_direct(), and
  execute_batch() method bodies from the header to query_processor.cc
- Updating transport/server.cc to use the concrete type instead of the
  no-longer-visible authorized_prepared_statements_cache::value_type

Per-file measurement: files including query_processor.hh now show zero
loading_cache template instantiation events (previously 20-32s each).

Wall-clock measurement (clean build, -j16, 16 cores, Seastar cached):
  Baseline (origin/master):           avg 24m01s (24m03s, 23m59s)
  With loading_cache chain break:     avg 23m29s (23m32s, 23m29s, 23m27s)
  Improvement:                        ~32s, ~2.2%
This commit is contained in:
Yaniv Michael Kaul
2026-04-15 04:21:15 +03:00
parent b499dc8e9d
commit b324c84a04
3 changed files with 116 additions and 81 deletions

View File

@@ -17,6 +17,9 @@
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/try_future.hh>
#include "cql3/prepared_statements_cache.hh"
#include "cql3/authorized_prepared_statements_cache.hh"
#include "transport/messages/result_message.hh"
#include "service/storage_proxy.hh"
#include "service/migration_manager.hh"
#include "service/mapreduce_service.hh"
@@ -77,7 +80,7 @@ static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm)
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)
@@ -85,8 +88,8 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
, _vector_store_client(vsc)
, _mcfg(mcfg)
, _cql_config(cql_cfg)
, _prepared_cache(prep_cache_log, _mcfg.prepared_statment_cache_size)
, _authorized_prepared_cache(std::move(auth_prep_cache_cfg), authorized_prepared_statements_cache_log)
, _prepared_cache(std::make_unique<prepared_statements_cache>(prep_cache_log, _mcfg.prepared_statment_cache_size))
, _authorized_prepared_cache(std::make_unique<authorized_prepared_statements_cache>(auth_prep_cache_cfg, authorized_prepared_statements_cache_log))
, _auth_prepared_cache_cfg_cb([this] (uint32_t) { (void) _authorized_prepared_cache_config_action.trigger_later(); })
, _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); })
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
@@ -353,12 +356,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
sm::make_gauge(
"prepared_cache_size",
[this] { return _prepared_cache.size(); },
[this] { return _prepared_cache->size(); },
sm::description("A number of entries in the prepared statements cache.")),
sm::make_gauge(
"prepared_cache_memory_footprint",
[this] { return _prepared_cache.memory_footprint(); },
[this] { return _prepared_cache->memory_footprint(); },
sm::description("Size (in bytes) of the prepared statements cache.")),
sm::make_counter(
@@ -449,12 +452,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
sm::make_gauge(
"authorized_prepared_statements_cache_size",
[this] { return _authorized_prepared_cache.size(); },
[this] { return _authorized_prepared_cache->size(); },
sm::description("Number of entries in the authenticated prepared statements cache.")),
sm::make_gauge(
"user_prepared_auth_cache_footprint",
[this] { return _authorized_prepared_cache.memory_footprint(); },
[this] { return _authorized_prepared_cache->memory_footprint(); },
sm::description("Size (in bytes) of the authenticated prepared statements cache.")),
sm::make_counter(
@@ -554,6 +557,81 @@ query_processor::~query_processor() {
}
}
statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache->find(*user, key);
if (vp) {
try {
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
// corresponds to the last time its value has been read.
//
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
// we need to create space in the prepared statements cache for a new entry.
//
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
// breaking the LRU paradigm of these caches.
_prepared_cache->touch(key);
return vp->get()->checked_weak_from_this();
} catch (seastar::checked_ptr_is_null_exception&) {
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
_authorized_prepared_cache->remove(*user, key);
}
}
}
return statements::prepared_statement::checked_weak_ptr();
}
statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const prepared_cache_key_type& key) {
return _prepared_cache->find(key);
}
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::execute_prepared(
statements::prepared_statement::checked_weak_ptr statement,
cql3::prepared_cache_key_type cache_key,
service::query_state& query_state,
const query_options& options,
bool needs_authorization) {
auto cql_statement = statement->statement;
return execute_prepared_without_checking_exception_message(
query_state,
std::move(cql_statement),
options,
std::move(statement),
std::move(cache_key),
needs_authorization)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::execute_direct(
const std::string_view& query_string,
service::query_state& query_state,
dialect d,
query_options& options) {
return execute_direct_without_checking_exception_message(
query_string,
query_state,
d,
options)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::execute_batch(
::shared_ptr<statements::batch_statement> stmt,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
return execute_batch_without_checking_exception_message(
std::move(stmt),
query_state,
options,
std::move(pending_authorization_entries))
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
query_processor::acquire_strongly_consistent_coordinator() {
auto [remote_, holder] = remote();
@@ -577,8 +655,8 @@ future<> query_processor::stop_remote() {
future<> query_processor::stop() {
co_await _mnotifier.unregister_listener(_migration_subscriber.get());
co_await _authorized_prepared_cache.stop();
co_await _prepared_cache.stop();
co_await _authorized_prepared_cache->stop();
co_await _prepared_cache->stop();
}
future<::shared_ptr<cql_transport::messages::result_message>> query_processor::execute_with_guard(
@@ -697,7 +775,7 @@ query_processor::do_execute_prepared(
if (needs_authorization) {
co_await statement->check_access(*this, query_state.get_client_state());
try {
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared));
co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared));
} catch (...) {
log.error("failed to cache the entry: {}", std::current_exception());
}
@@ -733,7 +811,7 @@ future<::shared_ptr<cql_transport::messages::result_message::prepared>>
query_processor::prepare(sstring query_string, const service::client_state& client_state, cql3::dialect d) {
try {
auto key = compute_id(query_string, client_state.get_raw_keyspace(), d);
auto prep_entry = co_await _prepared_cache.get_pinned(key, [this, &query_string, &client_state, d] {
auto prep_entry = co_await _prepared_cache->get_pinned(key, [this, &query_string, &client_state, d] {
auto prepared = get_statement(query_string, client_state, d);
prepared->calculate_metadata_id();
auto bound_terms = prepared->statement->get_bound_terms();
@@ -1069,11 +1147,11 @@ query_processor::execute_batch_without_checking_exception_message(
::shared_ptr<statements::batch_statement> batch,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
auto access_future = co_await coroutine::as_future(batch->check_access(*this, query_state.get_client_state()));
co_await coroutine::parallel_for_each(pending_authorization_entries, [this, &query_state] (auto& e) -> future<> {
try {
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), e.first, std::move(e.second));
co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), e.first, std::move(e.second));
} catch (...) {
log.error("failed to cache the entry: {}", std::current_exception());
}
@@ -1241,7 +1319,7 @@ void query_processor::migration_subscriber::on_drop_view(const sstring& ks_name,
void query_processor::migration_subscriber::remove_invalid_prepared_statements(
sstring ks_name,
std::optional<sstring> cf_name) {
_qp->_prepared_cache.remove_if([&] (::shared_ptr<cql_statement> stmt) {
_qp->_prepared_cache->remove_if([&] (::shared_ptr<cql_statement> stmt) {
return this->should_invalidate(ks_name, cf_name, stmt);
});
}
@@ -1298,13 +1376,13 @@ void query_processor::update_authorized_prepared_cache_config() {
std::chrono::duration_cast<std::chrono::milliseconds>(prepared_statements_cache::entry_expiry));
cfg.refresh = std::chrono::milliseconds(_db.get_config().permissions_update_interval_in_ms());
if (!_authorized_prepared_cache.update_config(std::move(cfg))) {
if (!_authorized_prepared_cache->update_config(std::move(cfg))) {
log.error("Failed to apply authorized prepared statements cache changes. Please read the documentation of these parameters");
}
}
void query_processor::reset_cache() {
_authorized_prepared_cache.reset();
_authorized_prepared_cache->reset();
}
}

View File

@@ -17,15 +17,16 @@
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include "cql3/prepared_statements_cache.hh"
#include "cql3/authorized_prepared_statements_cache.hh"
#include "cql3/prepared_cache_key_type.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/cql_statement.hh"
#include "cql3/dialect.hh"
#include "cql3/query_options.hh"
#include "cql3/stats.hh"
#include "exceptions/exceptions.hh"
#include "service/migration_listener.hh"
#include "mutation/timestamp.hh"
#include "transport/messages/result_message.hh"
#include "transport/messages/result_message_base.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "vector_search/vector_store_client.hh"
@@ -41,6 +42,11 @@
namespace lang { class manager; }
namespace utils {
template <typename Clock>
struct loading_cache_config_base;
using loading_cache_config = loading_cache_config_base<seastar::lowres_clock>;
}
namespace service {
class migration_manager;
class query_state;
@@ -58,6 +64,9 @@ struct query;
namespace cql3 {
class prepared_statements_cache;
class authorized_prepared_statements_cache;
namespace statements {
class batch_statement;
class schema_altering_statement;
@@ -132,8 +141,8 @@ private:
seastar::metrics::metric_groups _metrics;
prepared_statements_cache _prepared_cache;
authorized_prepared_statements_cache _authorized_prepared_cache;
std::unique_ptr<prepared_statements_cache> _prepared_cache;
std::unique_ptr<authorized_prepared_statements_cache> _authorized_prepared_cache;
// Tracks the rolling maximum of gross bytes allocated during CQL parsing
utils::rolling_max_tracker _parsing_cost_tracker{1000};
@@ -184,7 +193,7 @@ public:
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm);
~query_processor();
@@ -231,53 +240,17 @@ public:
return _vector_store_client;
}
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache.find(*user, key);
if (vp) {
try {
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
// corresponds to the last time its value has been read.
//
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
// we need to create space in the prepared statements cache for a new entry.
//
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
// breaking the LRU paradigm of these caches.
_prepared_cache.touch(key);
return vp->get()->checked_weak_from_this();
} catch (seastar::checked_ptr_is_null_exception&) {
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
_authorized_prepared_cache.remove(*user, key);
}
}
}
return statements::prepared_statement::checked_weak_ptr();
}
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key);
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
return _prepared_cache.find(key);
}
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key);
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared(
statements::prepared_statement::checked_weak_ptr statement,
cql3::prepared_cache_key_type cache_key,
service::query_state& query_state,
const query_options& options,
bool needs_authorization) {
auto cql_statement = statement->statement;
return execute_prepared_without_checking_exception_message(
query_state,
std::move(cql_statement),
options,
std::move(statement),
std::move(cache_key),
needs_authorization)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
bool needs_authorization);
// Like execute_prepared, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
@@ -301,20 +274,12 @@ public:
bool needs_authorization);
/// Execute a client statement that was not prepared.
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_direct(
const std::string_view& query_string,
service::query_state& query_state,
dialect d,
query_options& options) {
return execute_direct_without_checking_exception_message(
query_string,
query_state,
d,
options)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
query_options& options);
// Like execute_direct, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
@@ -466,20 +431,12 @@ public:
future<> stop();
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_batch(
::shared_ptr<statements::batch_statement> stmt,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
return execute_batch_without_checking_exception_message(
std::move(stmt),
query_state,
options,
std::move(pending_authorization_entries))
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
// Like execute_batch, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
@@ -488,7 +445,7 @@ public:
::shared_ptr<statements::batch_statement>,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
future<service::broadcast_tables::query_result>
execute_broadcast_table_query(const service::broadcast_tables::query&);

View File

@@ -1687,7 +1687,7 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
std::vector<cql3::statements::batch_statement::single_statement> modifications;
std::vector<cql3::raw_value_view_vector_with_unset> values;
std::unordered_map<cql3::prepared_cache_key_type, cql3::authorized_prepared_statements_cache::value_type> pending_authorization_entries;
std::unordered_map<cql3::prepared_cache_key_type, cql3::statements::prepared_statement::checked_weak_ptr> pending_authorization_entries;
modifications.reserve(n.assume_value());
values.reserve(n.assume_value());