mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-30 13:17:01 +00:00
transport/messages: hold pinned prepared entry in PREPARE result
result_message::prepared now owns a strong pinned prepared-cache entry instead of relying only on a weak pointer view. This closes the remaining lifetime gap after query_processor::prepare() returns, so users of the returned PREPARE message cannot observe an invalidated weak handle during subsequent processing. - update result_message::prepared::cql constructor to accept pinned entry - construct weak view from owned pinned entry inside the message - pass pinned cache entry from query_processor::prepare() into the message constructor
This commit is contained in:
@@ -740,19 +740,14 @@ query_processor::prepare(sstring query_string, const service::client_state& clie
|
||||
SCYLLA_ASSERT(bound_terms == prepared->bound_names.size());
|
||||
return make_ready_future<std::unique_ptr<statements::prepared_statement>>(std::move(prepared));
|
||||
});
|
||||
auto prep_ptr = (*prep_entry)->checked_weak_from_this();
|
||||
|
||||
co_await utils::get_local_injector().inject(
|
||||
"query_processor_prepare_wait_after_cache_get",
|
||||
utils::wait_for_message(std::chrono::seconds(60)));
|
||||
|
||||
const auto& warnings = (*prep_entry)->warnings;
|
||||
const auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_ptr),
|
||||
|
||||
auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_entry),
|
||||
client_state.is_protocol_extension_set(cql_transport::cql_protocol_extension::LWT_ADD_METADATA_MARK));
|
||||
for (const auto& w : warnings) {
|
||||
msg->add_warning(w);
|
||||
}
|
||||
co_return ::shared_ptr<cql_transport::messages::result_message::prepared>(std::move(msg));
|
||||
co_return std::move(msg);
|
||||
} catch(typename prepared_statements_cache::statement_is_too_big&) {
|
||||
throw prepared_statement_is_too_big(query_string);
|
||||
}
|
||||
|
||||
@@ -454,7 +454,7 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
auto ps_ptr = qp.get_prepared(cache_key);
|
||||
if (!ps_ptr) {
|
||||
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = std::move(msg_ptr->get_prepared());
|
||||
ps_ptr = msg_ptr->get_prepared();
|
||||
if (!ps_ptr) {
|
||||
on_internal_error(paxos_state::logger, "prepared statement is null");
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ future<bool> table_helper::try_prepare(bool fallback, cql3::query_processor& qp,
|
||||
auto& stmt = fallback ? _insert_cql_fallback.value() : _insert_cql;
|
||||
try {
|
||||
shared_ptr<cql_transport::messages::result_message::prepared> msg_ptr = co_await qp.prepare(stmt, qs.get_client_state(), dialect);
|
||||
_prepared_stmt = std::move(msg_ptr->get_prepared());
|
||||
_prepared_stmt = msg_ptr->get_prepared();
|
||||
shared_ptr<cql3::cql_statement> cql_stmt = _prepared_stmt->statement;
|
||||
_insert_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(cql_stmt);
|
||||
_is_fallback_stmt = fallback;
|
||||
|
||||
@@ -67,14 +67,17 @@ void result_message::visitor_base::visit(const result_message::exception& ex) {
|
||||
ex.throw_me();
|
||||
}
|
||||
|
||||
result_message::prepared::prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt)
|
||||
: _prepared(std::move(prepared))
|
||||
result_message::prepared::prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
|
||||
: _prepared_entry(std::move(prepared_entry))
|
||||
, _metadata(
|
||||
_prepared->bound_names,
|
||||
_prepared->partition_key_bind_indices,
|
||||
support_lwt_opt ? _prepared->statement->is_conditional() : false)
|
||||
, _result_metadata{extract_result_metadata(_prepared->statement)}
|
||||
(*_prepared_entry)->bound_names,
|
||||
(*_prepared_entry)->partition_key_bind_indices,
|
||||
support_lwt_opt ? (*_prepared_entry)->statement->is_conditional() : false)
|
||||
, _result_metadata{extract_result_metadata((*_prepared_entry)->statement)}
|
||||
{
|
||||
for (const auto& w : (*_prepared_entry)->warnings){
|
||||
add_warning(w);
|
||||
}
|
||||
}
|
||||
|
||||
::shared_ptr<const cql3::metadata> result_message::prepared::extract_result_metadata(::shared_ptr<cql3::cql_statement> statement) {
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <concepts>
|
||||
|
||||
#include "cql3/result_set.hh"
|
||||
#include "cql3/prepared_statements_cache.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
|
||||
@@ -30,14 +31,14 @@ namespace messages {
|
||||
|
||||
class result_message::prepared : public result_message {
|
||||
private:
|
||||
cql3::statements::prepared_statement::checked_weak_ptr _prepared;
|
||||
cql3::prepared_statements_cache::pinned_value_type _prepared_entry;
|
||||
cql3::prepared_metadata _metadata;
|
||||
::shared_ptr<const cql3::metadata> _result_metadata;
|
||||
protected:
|
||||
prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt);
|
||||
prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt);
|
||||
public:
|
||||
cql3::statements::prepared_statement::checked_weak_ptr& get_prepared() {
|
||||
return _prepared;
|
||||
cql3::statements::prepared_statement::checked_weak_ptr get_prepared() {
|
||||
return (*_prepared_entry)->checked_weak_from_this();
|
||||
}
|
||||
|
||||
const cql3::prepared_metadata& metadata() const {
|
||||
@@ -49,7 +50,7 @@ public:
|
||||
}
|
||||
|
||||
cql3::cql_metadata_id_type get_metadata_id() const {
|
||||
return _prepared->get_metadata_id();
|
||||
return (*_prepared_entry)->get_metadata_id();
|
||||
}
|
||||
|
||||
class cql;
|
||||
@@ -166,8 +167,8 @@ std::ostream& operator<<(std::ostream& os, const result_message::set_keyspace& m
|
||||
class result_message::prepared::cql : public result_message::prepared {
|
||||
bytes _id;
|
||||
public:
|
||||
cql(const bytes& id, cql3::statements::prepared_statement::checked_weak_ptr p, bool support_lwt_opt)
|
||||
: result_message::prepared(std::move(p), support_lwt_opt)
|
||||
cql(const bytes& id, cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
|
||||
: result_message::prepared(std::move(prepared_entry), support_lwt_opt)
|
||||
, _id{id}
|
||||
{ }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user