transport/messages: hold pinned prepared entry in PREPARE result

result_message::prepared now owns a strong pinned prepared-cache entry instead of relying only on a weak pointer view. This closes the remaining lifetime gap after query_processor::prepare() returns, so users of the returned PREPARE message cannot observe an invalidated weak handle during subsequent
  processing.

  - update result_message::prepared::cql constructor to accept pinned entry
  - construct weak view from owned pinned entry inside the message
  - pass pinned cache entry from query_processor::prepare() into the message constructor
This commit is contained in:
Alex
2026-03-09 16:45:37 +02:00
parent 27051d9a7c
commit 3ac4e258e8
5 changed files with 22 additions and 23 deletions

View File

@@ -740,19 +740,14 @@ query_processor::prepare(sstring query_string, const service::client_state& clie
SCYLLA_ASSERT(bound_terms == prepared->bound_names.size());
return make_ready_future<std::unique_ptr<statements::prepared_statement>>(std::move(prepared));
});
auto prep_ptr = (*prep_entry)->checked_weak_from_this();
co_await utils::get_local_injector().inject(
"query_processor_prepare_wait_after_cache_get",
utils::wait_for_message(std::chrono::seconds(60)));
const auto& warnings = (*prep_entry)->warnings;
const auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_ptr),
auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_entry),
client_state.is_protocol_extension_set(cql_transport::cql_protocol_extension::LWT_ADD_METADATA_MARK));
for (const auto& w : warnings) {
msg->add_warning(w);
}
co_return ::shared_ptr<cql_transport::messages::result_message::prepared>(std::move(msg));
co_return std::move(msg);
} catch(typename prepared_statements_cache::statement_is_too_big&) {
throw prepared_statement_is_too_big(query_string);
}

View File

@@ -454,7 +454,7 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
auto ps_ptr = qp.get_prepared(cache_key);
if (!ps_ptr) {
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = std::move(msg_ptr->get_prepared());
ps_ptr = msg_ptr->get_prepared();
if (!ps_ptr) {
on_internal_error(paxos_state::logger, "prepared statement is null");
}

View File

@@ -75,7 +75,7 @@ future<bool> table_helper::try_prepare(bool fallback, cql3::query_processor& qp,
auto& stmt = fallback ? _insert_cql_fallback.value() : _insert_cql;
try {
shared_ptr<cql_transport::messages::result_message::prepared> msg_ptr = co_await qp.prepare(stmt, qs.get_client_state(), dialect);
_prepared_stmt = std::move(msg_ptr->get_prepared());
_prepared_stmt = msg_ptr->get_prepared();
shared_ptr<cql3::cql_statement> cql_stmt = _prepared_stmt->statement;
_insert_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(cql_stmt);
_is_fallback_stmt = fallback;

View File

@@ -67,14 +67,17 @@ void result_message::visitor_base::visit(const result_message::exception& ex) {
ex.throw_me();
}
result_message::prepared::prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt)
: _prepared(std::move(prepared))
result_message::prepared::prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
: _prepared_entry(std::move(prepared_entry))
, _metadata(
_prepared->bound_names,
_prepared->partition_key_bind_indices,
support_lwt_opt ? _prepared->statement->is_conditional() : false)
, _result_metadata{extract_result_metadata(_prepared->statement)}
(*_prepared_entry)->bound_names,
(*_prepared_entry)->partition_key_bind_indices,
support_lwt_opt ? (*_prepared_entry)->statement->is_conditional() : false)
, _result_metadata{extract_result_metadata((*_prepared_entry)->statement)}
{
for (const auto& w : (*_prepared_entry)->warnings){
add_warning(w);
}
}
::shared_ptr<const cql3::metadata> result_message::prepared::extract_result_metadata(::shared_ptr<cql3::cql_statement> statement) {

View File

@@ -13,6 +13,7 @@
#include <concepts>
#include "cql3/result_set.hh"
#include "cql3/prepared_statements_cache.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/query_options.hh"
@@ -30,14 +31,14 @@ namespace messages {
class result_message::prepared : public result_message {
private:
cql3::statements::prepared_statement::checked_weak_ptr _prepared;
cql3::prepared_statements_cache::pinned_value_type _prepared_entry;
cql3::prepared_metadata _metadata;
::shared_ptr<const cql3::metadata> _result_metadata;
protected:
prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt);
prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt);
public:
cql3::statements::prepared_statement::checked_weak_ptr& get_prepared() {
return _prepared;
cql3::statements::prepared_statement::checked_weak_ptr get_prepared() {
return (*_prepared_entry)->checked_weak_from_this();
}
const cql3::prepared_metadata& metadata() const {
@@ -49,7 +50,7 @@ public:
}
cql3::cql_metadata_id_type get_metadata_id() const {
return _prepared->get_metadata_id();
return (*_prepared_entry)->get_metadata_id();
}
class cql;
@@ -166,8 +167,8 @@ std::ostream& operator<<(std::ostream& os, const result_message::set_keyspace& m
class result_message::prepared::cql : public result_message::prepared {
bytes _id;
public:
cql(const bytes& id, cql3::statements::prepared_statement::checked_weak_ptr p, bool support_lwt_opt)
: result_message::prepared(std::move(p), support_lwt_opt)
cql(const bytes& id, cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
: result_message::prepared(std::move(prepared_entry), support_lwt_opt)
, _id{id}
{ }