Compare commits

...

2 Commits

Author SHA1 Message Date
Alex
0313d3e291 test/cluster: add cluster test for prepared metadata_id promotion
Exercises the full server-side prepared metadata_id promotion path using
the Scylla Python driver rather than raw sockets, via two lightweight
patches:

- _UseMetadataId(ApplicationInfoBase) injects SCYLLA_USE_METADATA_ID
  into the CQL STARTUP options so the server enables the v4 metadata_id
  wire exchange for this connection.
- mock.patch on ProtocolVersion.uses_prepared_metadata (forced True for
  all versions) makes the driver write result_metadata_id in EXECUTE
  frames and read it back from PREPARE/ROWS responses on protocol v4.
- A wrapper around ResultMessage.recv_results_metadata captures the
  result_metadata_id set by the driver on the ROWS ResultMessage (the
  driver parses it there but does not propagate it to PreparedStatement
  in the normal rows path).

The happy-path test prepares LIST ROLES OF (a statement with empty
result metadata at prepare time), executes it, and verifies:
- The ROWS response carried METADATA_CHANGED (promotion occurred).
- The response includes a non-None result_metadata_id.
- The promoted id differs from the stale empty-metadata id returned
  by PREPARE.

The suppression test injects skip_prepared_result_metadata_promotion to
confirm the happy-path test is not a false positive: the injection skips
the entire promotion block so no real_id is computed, the cache is not
updated, and the response carries no METADATA_CHANGED flag.
2026-04-01 13:24:16 +03:00
Alex
4a06935bb0 transport/server: Promote prepared metadata_id after first rows response
When a prepared statement is cached without a result metadata_id (e.g.
LIST ROLES OF, whose result schema is not known at prepare time), the
server now detects this on the first EXECUTE that returns rows.

Promotion logic lives entirely in transport's .then() lambda:
1. Snapshot result_metadata_was_empty before calling execute_prepared_*.
2. After a rows response, calculate the real metadata_id from the result
   metadata.
3. Update the cache via qp.local().update_prepared_result_metadata_id()
   so subsequent EXECUTEs on the same shard skip promotion entirely.
4. Return METADATA_CHANGED to the client so it can update its stale
   cached metadata_id.

No promotion logic remains in do_execute_prepared; query_processor.hh
exposes update_prepared_result_metadata_id() which delegates to the
prepared_statements_cache.  The prepared (checked_weak_ptr) is no longer
captured in the lambda, avoiding a use-after-free if a concurrent ALTER
TABLE evicts the entry between scheduling and execution.

Adds a Boost unit test (schema_change_test) that verifies the
METADATA_CHANGED flag is set in the rows response when a stale
metadata_id is supplied.  The skip_prepared_result_metadata_promotion
error injection point in the transport lambda allows suppressing the
entire promotion block (no real_id computation, no cache update) for
negative testing.
2026-04-01 12:32:23 +03:00
7 changed files with 380 additions and 4 deletions

View File

@@ -137,6 +137,15 @@ public:
return value_type();
}
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
cache_value_ptr vp = _cache.find(key.key());
if (!vp) {
return false;
}
(*vp)->update_result_metadata_id(std::move(metadata_id));
return true;
}
template <typename Pred>
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
void remove_if(Pred&& pred) {

View File

@@ -481,6 +481,12 @@ public:
void update_authorized_prepared_cache_config();
/// Update the result metadata_id of a cached prepared statement.
/// Returns true if the entry was found and updated, false if it was evicted.
bool update_prepared_result_metadata_id(const cql3::prepared_cache_key_type& cache_key, cql3::cql_metadata_id_type metadata_id) {
return _prepared_cache.update_result_metadata_id(cache_key, std::move(metadata_id));
}
void reset_cache();
bool topology_global_queue_empty();

View File

@@ -52,6 +52,7 @@ public:
std::vector<sstring> warnings;
private:
cql_metadata_id_type _metadata_id;
bool _result_metadata_is_empty;
public:
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
@@ -71,6 +72,15 @@ public:
void calculate_metadata_id();
cql_metadata_id_type get_metadata_id() const;
bool result_metadata_is_empty() const {
return _result_metadata_is_empty;
}
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
_metadata_id = std::move(metadata_id);
_result_metadata_is_empty = false;
}
};
}

View File

@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
, partition_key_bind_indices(std::move(partition_key_bind_indices))
, warnings(std::move(warnings))
, _metadata_id(bytes{})
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
{
statement->set_audit_info(std::move(audit_info));
}

View File

@@ -34,6 +34,9 @@
#include "test/lib/log.hh"
#include "cdc/cdc_extension.hh"
#include "test/lib/test_utils.hh"
#include "transport/request.hh"
#include "transport/response.hh"
#include "utils/memory_data_sink.hh"
BOOST_AUTO_TEST_SUITE(schema_change_test)
@@ -1181,6 +1184,16 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
return cql3::metadata{columns_specification}.calculate_metadata_id();
}
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
columns_specification.reserve(columns.size());
for (const auto& column : columns) {
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
}
return columns_specification;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
const auto c = std::make_pair("id", uuid_type);
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
@@ -1231,6 +1244,55 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
}
SEASTAR_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
auto columns_specification = make_columns_specification({{"role", utf8_type}});
cql3::metadata rows_metadata(columns_specification);
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
std::move(empty_metadata_id),
cql3::cql_metadata_id_type(bytes(rows_metadata_id._metadata_id))), true);
memory_data_sink_buffers buffers;
{
output_stream<char> out(data_sink(std::make_unique<memory_data_sink>(buffers)));
co_await resp.write_message(out, 4, cql_transport::cql_compression::none, deleter());
co_await out.close();
}
auto total_length = buffers.size();
auto fbufs = fragmented_temporary_buffer(buffers.buffers() | std::views::as_rvalue | std::ranges::to<std::vector>(), total_length);
bytes_ostream linearization_buffer;
auto req = cql_transport::request_reader(fbufs.get_istream(), linearization_buffer);
BOOST_REQUIRE_EQUAL(unsigned(uint8_t(req.read_byte().value())), 4 | 0x80);
BOOST_REQUIRE_EQUAL(unsigned(req.read_byte().value()), 0);
BOOST_REQUIRE_EQUAL(req.read_short().value(), 0);
BOOST_REQUIRE_EQUAL(unsigned(req.read_byte().value()), unsigned(uint8_t(cql_transport::cql_binary_opcode::RESULT)));
BOOST_REQUIRE_EQUAL(req.read_int().value(), total_length - 9);
auto body = req.read_raw_bytes_view(req.bytes_left()).value();
const auto* ptr = reinterpret_cast<const char*>(body.begin());
const auto flags_mask = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
const auto column_count = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
BOOST_REQUIRE_EQUAL(column_count, 1);
const auto metadata_id_length = read_be<uint16_t>(ptr);
ptr += sizeof(uint16_t);
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
reinterpret_cast<const bytes::value_type*>(ptr)));
co_return;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
auto compute_metadata_id_for_type = [&](

View File

@@ -0,0 +1,267 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import dataclasses
from typing import Optional
from unittest import mock
import pytest
from cassandra import ProtocolVersion
from cassandra.application_info import ApplicationInfoBase
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.protocol import ResultMessage
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.util import unique_name
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
# ---------------------------------------------------------------------------
# Driver helpers for SCYLLA_USE_METADATA_ID / result_metadata_id exchange.
#
# The standard Python driver gates result_metadata_id exchange on protocol v5+
# (ProtocolVersion.uses_prepared_metadata). ScyllaDB does not implement v5
# but exposes the same semantics on v4 via the SCYLLA_USE_METADATA_ID startup
# extension. Two lightweight patches make the driver exercise this path:
#
# 1. _UseMetadataId — ApplicationInfoBase subclass that injects
# SCYLLA_USE_METADATA_ID into the STARTUP options dict. Passed to
# Cluster(application_info=...). The driver merges these options into
# the STARTUP frame without any filtering.
#
# 2. mock.patch.object(ProtocolVersion, "uses_prepared_metadata", ...) —
# makes the driver write result_metadata_id in EXECUTE frames and read
# it back from PREPARE/ROWS responses, which is exactly the v4 extension
# wire format.
#
# Note: the driver does not send the SKIP_METADATA flag in EXECUTE even with
# these patches (it never ORs it into the options flags byte for prepared
# statements). The server does not require SKIP_METADATA to trigger
# promotion; without it, it returns full column metadata alongside
# METADATA_CHANGED.
# ---------------------------------------------------------------------------
class _UseMetadataId(ApplicationInfoBase):
"""Inject SCYLLA_USE_METADATA_ID into the CQL STARTUP options."""
def add_startup_options(self, options: dict) -> None:
options["SCYLLA_USE_METADATA_ID"] = ""
@dataclasses.dataclass
class _ExecuteResult:
"""Parsed outcome of interest from a prepared-statement EXECUTE."""
initial_metadata_id: Optional[bytes]
"""result_metadata_id returned by the PREPARE response."""
result_metadata_id: Optional[bytes]
"""result_metadata_id embedded in the ROWS EXECUTE response, if any."""
metadata_changed: bool
"""True when the ROWS response carried the METADATA_CHANGED result-metadata flag (i.e., promotion occurred)."""
row_count: int
"""Number of rows returned by the EXECUTE response."""
def _prepare_and_execute(host: str, query: str) -> _ExecuteResult:
"""
Connect via the Scylla Python driver with SCYLLA_USE_METADATA_ID negotiated,
prepare *query*, execute it once, and return relevant metadata_id fields.
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
The function uses two patches scoped to the connection lifetime:
* ``ProtocolVersion.uses_prepared_metadata`` is forced to return ``True``
for all protocol versions so that the driver reads/writes result_metadata_id
in PREPARE and EXECUTE frames on protocol v4.
* ``ResultMessage.recv_results_metadata`` is wrapped to capture
result_metadata_id from the ROWS response (the driver parses it there but
does not propagate it back to the PreparedStatement in the normal rows path).
"""
captured: dict = {"metadata_id": None, "metadata_changed": False}
original_recv = ResultMessage.recv_results_metadata
def _capturing_recv(self: ResultMessage, f, user_type_map) -> None:
original_recv(self, f, user_type_map)
rmi = getattr(self, "result_metadata_id", None)
if rmi is not None:
captured["metadata_id"] = rmi
captured["metadata_changed"] = True
with mock.patch.object(
ProtocolVersion,
"uses_prepared_metadata",
staticmethod(lambda v: True),
):
cluster = Cluster(
contact_points=[host],
port=9042,
protocol_version=4,
auth_provider=PlainTextAuthProvider("cassandra", "cassandra"),
application_info=_UseMetadataId(),
load_balancing_policy=WhiteListRoundRobinPolicy([host]),
)
session = cluster.connect()
try:
ps = session.prepare(query)
initial_metadata_id = ps.result_metadata_id
with mock.patch.object(
ResultMessage, "recv_results_metadata", _capturing_recv
):
rows = list(session.execute(ps))
return _ExecuteResult(
initial_metadata_id=initial_metadata_id,
result_metadata_id=captured["metadata_id"],
metadata_changed=captured["metadata_changed"],
row_count=len(rows),
)
finally:
session.shutdown()
cluster.shutdown()
@pytest.mark.asyncio
async def test_list_roles_of_prepared_metadata_promotion(
manager: ManagerClient,
build_mode: str,
) -> None:
"""Verify that EXECUTE promotes a stale prepared metadata_id, and that
disabling that promotion suppresses the resulting ``METADATA_CHANGED``.
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
does not know the result set schema because the statement implementation
builds the metadata dynamically at execute time. The server therefore
returns the metadata_id of empty metadata in the PREPARE response.
When the client later sends EXECUTE with the stale empty metadata_id, the
server should detect the mismatch (the actual rows have real metadata) and
respond with a ``METADATA_CHANGED`` result that carries the real
metadata_id so the client can update its cache. This is the behaviour
mandated by CQL v5; on CQL v4 it is exercised via the
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
wire-level exchange. The test repeats PREPARE/EXECUTE on the same query
to show that the promoted metadata_id is cached, and in non-release modes
it contrasts that with an injected execution where the cache update is
suppressed.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
promoted = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}"
)
assert promoted.row_count > 0, (
f"expected EXECUTE for 'LIST ROLES OF {role}' to return at least one row"
)
assert promoted.initial_metadata_id is not None, (
f"expected PREPARE for 'LIST ROLES OF {role}' to return a result_metadata_id"
)
assert promoted.metadata_changed, (
f"expected EXECUTE for 'LIST ROLES OF {role}' to return METADATA_CHANGED "
f"after PREPARE returned an empty result_metadata_id"
)
assert promoted.result_metadata_id is not None, (
f"expected EXECUTE for 'LIST ROLES OF {role}' to return a result_metadata_id "
f"alongside METADATA_CHANGED"
)
assert promoted.initial_metadata_id != promoted.result_metadata_id, (
f"expected promoted result_metadata_id to differ from the stale empty one "
f"returned by PREPARE"
)
cached = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}"
)
assert cached.row_count > 0, (
f"expected second EXECUTE for 'LIST ROLES OF {role}' to return at least one row"
)
assert cached.initial_metadata_id == promoted.result_metadata_id, (
f"expected second PREPARE for 'LIST ROLES OF {role}' to reuse the promoted "
f"result_metadata_id from the first EXECUTE"
)
assert not cached.metadata_changed, (
f"expected second EXECUTE for 'LIST ROLES OF {role}' not to return "
f"METADATA_CHANGED after the cache had been promoted"
)
assert cached.result_metadata_id is None, (
f"expected second EXECUTE for 'LIST ROLES OF {role}' not to return a new "
f"result_metadata_id once the cache had been promoted"
)
if build_mode == "release":
return
# Use a fresh prepared statement key so the promotion above does not seed
# the cache for the injected contrast case.
injected_role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {injected_role}")
async with inject_error(
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
):
suppressed = await asyncio.to_thread(
_prepare_and_execute,
server.ip_addr,
f"LIST ROLES OF {injected_role}",
)
assert suppressed.row_count > 0, (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to return at least one row"
)
assert suppressed.initial_metadata_id is not None, (
f"expected injected PREPARE for 'LIST ROLES OF {injected_role}' to return a result_metadata_id"
)
assert not suppressed.metadata_changed, (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to suppress "
f"METADATA_CHANGED, but the flag was set"
)
assert suppressed.result_metadata_id is None, (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to omit "
f"result_metadata_id when promotion is suppressed"
)
promoted_after_suppression = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {injected_role}"
)
assert promoted_after_suppression.row_count > 0, (
f"expected post-injection EXECUTE for 'LIST ROLES OF {injected_role}' to return at least one row"
)
assert (
promoted_after_suppression.initial_metadata_id == suppressed.initial_metadata_id
), (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' not to update the cached "
f"result_metadata_id"
)
assert promoted_after_suppression.metadata_changed, (
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
f"return METADATA_CHANGED because the injected run left the cache stale"
)
assert promoted_after_suppression.result_metadata_id is not None, (
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
f"return a promoted result_metadata_id"
)
assert (
promoted_after_suppression.initial_metadata_id
!= promoted_after_suppression.result_metadata_id
), (
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
f"promote the stale result_metadata_id left by the injected run"
)

View File

@@ -65,6 +65,7 @@
#include "transport/cql_protocol_extension.hh"
#include "utils/bit_cast.hh"
#include "utils/error_injection.hh"
#include "utils/labels.hh"
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
@@ -1304,14 +1305,27 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
}
tracing::trace(trace_state, "Processing a statement");
// Evaluated once here: drives both the conditional cache_key copy below and the promotion block in the lambda.
const bool should_promote_metadata_id = prepared->result_metadata_is_empty() && metadata_id.has_request_metadata_id() &&
!utils::get_local_injector().enter("skip_prepared_result_metadata_promotion");
auto maybe_copied_cache_key = should_promote_metadata_id ? std::optional(cache_key) : std::nullopt;
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
.then([&qp, trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version,
metadata_id = std::move(metadata_id), maybe_cache_key = std::move(maybe_copied_cache_key), should_promote_metadata_id] (auto msg) mutable {
if (msg->move_to_shard()) {
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg)));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
} else {
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
if (should_promote_metadata_id) {
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
auto real_id = rows->rs().get_metadata().calculate_metadata_id();
qp.local().update_prepared_result_metadata_id(*maybe_cache_key, real_id);
auto req = metadata_id.get_request_metadata_id();
metadata_id = cql_metadata_id_wrapper(std::move(req), std::move(real_id));
}
}
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
}
});
@@ -2222,9 +2236,16 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
no_metadata, skip_rows_metadata_changed_response);
if (!skip_rows_metadata_changed_response) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
}
}