From 3ac4e258e8a2d9db7430282d49e138e3b10fb43d Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 9 Mar 2026 16:45:37 +0200 Subject: [PATCH] transport/messages: hold pinned prepared entry in PREPARE result result_message::prepared now owns a strong pinned prepared-cache entry instead of relying only on a weak pointer view. This closes the remaining lifetime gap after query_processor::prepare() returns, so users of the returned PREPARE message cannot observe an invalidated weak handle during subsequent processing. - update result_message::prepared::cql constructor to accept pinned entry - construct weak view from owned pinned entry inside the message - pass pinned cache entry from query_processor::prepare() into the message constructor --- cql3/query_processor.cc | 11 +++-------- service/paxos/paxos_state.cc | 2 +- table_helper.cc | 2 +- transport/messages/result_message.cc | 15 +++++++++------ transport/messages/result_message.hh | 15 ++++++++------- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 87a54da576..ab54d7f523 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -740,19 +740,14 @@ query_processor::prepare(sstring query_string, const service::client_state& clie SCYLLA_ASSERT(bound_terms == prepared->bound_names.size()); return make_ready_future>(std::move(prepared)); }); - auto prep_ptr = (*prep_entry)->checked_weak_from_this(); co_await utils::get_local_injector().inject( "query_processor_prepare_wait_after_cache_get", utils::wait_for_message(std::chrono::seconds(60))); - - const auto& warnings = (*prep_entry)->warnings; - const auto msg = ::make_shared(prepared_cache_key_type::cql_id(key), std::move(prep_ptr), + + auto msg = ::make_shared(prepared_cache_key_type::cql_id(key), std::move(prep_entry), client_state.is_protocol_extension_set(cql_transport::cql_protocol_extension::LWT_ADD_METADATA_MARK)); - for (const auto& w : warnings) { - msg->add_warning(w); - } - co_return ::shared_ptr(std::move(msg)); + co_return std::move(msg); } catch(typename prepared_statements_cache::statement_is_too_big&) { throw prepared_statement_is_too_big(query_string); } diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index dd8ec19235..d2acc564f1 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -454,7 +454,7 @@ static future do_execute_cql_with_timeout(sstring req, auto ps_ptr = qp.get_prepared(cache_key); if (!ps_ptr) { const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect()); - ps_ptr = std::move(msg_ptr->get_prepared()); + ps_ptr = msg_ptr->get_prepared(); if (!ps_ptr) { on_internal_error(paxos_state::logger, "prepared statement is null"); } diff --git a/table_helper.cc b/table_helper.cc index 58a3ed4223..051d38e50e 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -75,7 +75,7 @@ future table_helper::try_prepare(bool fallback, cql3::query_processor& qp, auto& stmt = fallback ? _insert_cql_fallback.value() : _insert_cql; try { shared_ptr msg_ptr = co_await qp.prepare(stmt, qs.get_client_state(), dialect); - _prepared_stmt = std::move(msg_ptr->get_prepared()); + _prepared_stmt = msg_ptr->get_prepared(); shared_ptr cql_stmt = _prepared_stmt->statement; _insert_stmt = dynamic_pointer_cast(cql_stmt); _is_fallback_stmt = fallback; diff --git a/transport/messages/result_message.cc b/transport/messages/result_message.cc index bdba1e93d3..017bb12453 100644 --- a/transport/messages/result_message.cc +++ b/transport/messages/result_message.cc @@ -67,14 +67,17 @@ void result_message::visitor_base::visit(const result_message::exception& ex) { ex.throw_me(); } -result_message::prepared::prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt) - : _prepared(std::move(prepared)) +result_message::prepared::prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt) + : _prepared_entry(std::move(prepared_entry)) , _metadata( - _prepared->bound_names, - _prepared->partition_key_bind_indices, - support_lwt_opt ? _prepared->statement->is_conditional() : false) - , _result_metadata{extract_result_metadata(_prepared->statement)} + (*_prepared_entry)->bound_names, + (*_prepared_entry)->partition_key_bind_indices, + support_lwt_opt ? (*_prepared_entry)->statement->is_conditional() : false) + , _result_metadata{extract_result_metadata((*_prepared_entry)->statement)} { + for (const auto& w : (*_prepared_entry)->warnings){ + add_warning(w); + } } ::shared_ptr result_message::prepared::extract_result_metadata(::shared_ptr statement) { diff --git a/transport/messages/result_message.hh b/transport/messages/result_message.hh index d92f94c6ef..9511d465da 100644 --- a/transport/messages/result_message.hh +++ b/transport/messages/result_message.hh @@ -13,6 +13,7 @@ #include #include "cql3/result_set.hh" +#include "cql3/prepared_statements_cache.hh" #include "cql3/statements/prepared_statement.hh" #include "cql3/query_options.hh" @@ -30,14 +31,14 @@ namespace messages { class result_message::prepared : public result_message { private: - cql3::statements::prepared_statement::checked_weak_ptr _prepared; + cql3::prepared_statements_cache::pinned_value_type _prepared_entry; cql3::prepared_metadata _metadata; ::shared_ptr _result_metadata; protected: - prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt); + prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt); public: - cql3::statements::prepared_statement::checked_weak_ptr& get_prepared() { - return _prepared; + cql3::statements::prepared_statement::checked_weak_ptr get_prepared() { + return (*_prepared_entry)->checked_weak_from_this(); } const cql3::prepared_metadata& metadata() const { @@ -49,7 +50,7 @@ public: } cql3::cql_metadata_id_type get_metadata_id() const { - return _prepared->get_metadata_id(); + return (*_prepared_entry)->get_metadata_id(); } class cql; @@ -166,8 +167,8 @@ std::ostream& operator<<(std::ostream& os, const result_message::set_keyspace& m class result_message::prepared::cql : public result_message::prepared { bytes _id; public: - cql(const bytes& id, cql3::statements::prepared_statement::checked_weak_ptr p, bool support_lwt_opt) - : result_message::prepared(std::move(p), support_lwt_opt) + cql(const bytes& id, cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt) + : result_message::prepared(std::move(prepared_entry), support_lwt_opt) , _id{id} { }