Compare commits

...

2 Commits

Author SHA1 Message Date
Alex
3ce2a2a479 test/cluster: add cluster test for prepared metadata_id promotion
Add a regression test that verifies the server correctly promotes the
prepared metadata_id for statements whose PREPARE response carries empty
result metadata (NO_METADATA), such as LIST ROLES OF.
The standard Python driver does not negotiate SCYLLA_USE_METADATA_ID and
cannot exercise this path, so the test uses a minimal raw CQL v5 socket
implementation that negotiates the extension directly in the STARTUP
options and includes result_metadata_id in EXECUTE frames.

Two test cases are added:
- test_list_roles_of_prepared_metadata_promotion: happy path — verifies
  that an EXECUTE carrying a stale empty metadata_id receives a
  METADATA_CHANGED response with the real metadata_id.
- test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection:
  negative path — activates the skip_rows_metadata_changed_response
  error injection point and verifies that the promotion is suppressed,
  confirming the happy-path test is not a false positive.

To support the injection-based negative test, two error injection points
are added to transport/server.cc:
- skip_prepared_result_metadata_promotion: bypasses the promotion logic
  in process_execute_internal so the cached prepared entry is not updated.
- skip_rows_metadata_changed_response: suppresses the METADATA_CHANGED
  flag and NO_METADATA clearance in response::write so the client sees
  the stale NO_METADATA response as if the fix were absent.

Tests: test/cluster/auth_cluster/test_prepared_metadata_promotion.py (dev/dbg)
2026-03-30 10:48:17 +03:00
Alex
2cdd178379 transport/server: Promote prepared metadata_id after first rows response
Some prepared statements do not know their result metadata at PREPARE
  time and therefore return the metadata_id of empty metadata. When such a
  statement later produces a ROWS response with real metadata, comparing the
  client-supplied metadata_id against the prepared response metadata_id is
  incorrect: the server keeps NO_METADATA even though the actual rows metadata
  differs.

  Scylla already has the actual rows metadata when EXECUTE returns a ROWS
  result. Use that first execution to promote the cached prepared statement to
  the normal metadata_id path.

  This change keeps the existing behavior for statements whose PREPARED
  response already carries real result metadata. For prepared statements whose
  PREPARED response had empty result metadata, the first EXECUTE with
  metadata_id support now:

  - calculates the metadata_id from the actual rows metadata
  - updates the cached prepared entry with that metadata_id
  - marks the prepared entry as having non-empty result metadata
  - uses the promoted metadata_id for the current response

  After that promotion, subsequent EXECUTEs use the existing fast path and do
  not need to recalculate the metadata_id again.

  The prepared statement remains read-only through public checked weak
  handles. The mutation is performed only through the prepared statements
  cache/query_processor layer, which owns the mutable cached entry.

  Testing:

  - add a regression test verifying that a ROWS response built from a stale
    empty-metadata id returns METADATA_CHANGED and the actual rows
    metadata_id
2026-03-29 15:52:31 +03:00
7 changed files with 422 additions and 5 deletions

View File

@@ -143,6 +143,15 @@ public:
return value_type();
}
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
cache_value_ptr vp = _cache.find(key.key());
if (!vp) {
return false;
}
(*vp)->update_result_metadata_id(std::move(metadata_id));
return true;
}
template <typename Pred>
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
void remove_if(Pred&& pred) {

View File

@@ -260,6 +260,10 @@ public:
return _prepared_cache.find(key);
}
bool update_prepared_result_metadata_id(const prepared_cache_key_type& key, cql_metadata_id_type metadata_id) {
return _prepared_cache.update_result_metadata_id(key, std::move(metadata_id));
}
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared(

View File

@@ -52,6 +52,7 @@ public:
std::vector<sstring> warnings;
private:
cql_metadata_id_type _metadata_id;
bool _result_metadata_is_empty;
public:
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
@@ -71,6 +72,15 @@ public:
void calculate_metadata_id();
cql_metadata_id_type get_metadata_id() const;
bool result_metadata_is_empty() const {
return _result_metadata_is_empty;
}
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
_metadata_id = std::move(metadata_id);
_result_metadata_is_empty = false;
}
};
}

View File

@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
, partition_key_bind_indices(std::move(partition_key_bind_indices))
, warnings(std::move(warnings))
, _metadata_id(bytes{})
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
{
statement->set_audit_info(std::move(audit_info));
}

View File

@@ -29,6 +29,7 @@
#include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "transport/response.hh"
BOOST_AUTO_TEST_SUITE(schema_change_test)
@@ -701,6 +702,16 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
return cql3::metadata{columns_specification}.calculate_metadata_id();
}
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
columns_specification.reserve(columns.size());
for (const auto& column : columns) {
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
}
return columns_specification;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
const auto c = std::make_pair("id", uuid_type);
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
@@ -751,6 +762,39 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
}
BOOST_AUTO_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
auto stale_response_metadata_id = empty_metadata_id;
auto columns_specification = make_columns_specification({{"role", utf8_type}});
cql3::metadata rows_metadata(columns_specification);
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
std::move(empty_metadata_id),
std::move(stale_response_metadata_id)), true);
auto body_stream = std::move(resp).extract_body();
auto body = body_stream.linearize();
const auto* ptr = reinterpret_cast<const char*>(body.begin());
const auto flags_mask = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
const auto column_count = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
BOOST_REQUIRE_EQUAL(column_count, 1);
const auto metadata_id_length = read_be<uint16_t>(ptr);
ptr += sizeof(uint16_t);
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
reinterpret_cast<const bytes::value_type*>(ptr)));
}
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
auto compute_metadata_id_for_type = [&](

View File

@@ -0,0 +1,328 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import dataclasses
import hashlib
import socket
import struct
import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.util import unique_name
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
# ---------------------------------------------------------------------------
# Minimal raw CQL v4 socket helpers with SCYLLA_USE_METADATA_ID extension.
#
# The standard Python driver never negotiates SCYLLA_USE_METADATA_ID and
# therefore never includes result_metadata_id in EXECUTE requests for
# protocol v4. In CQL v5 result_metadata_id exchange is mandatory and
# built into the wire format; until Scylla implements v5, this extension
# provides the same semantics on v4. The helpers below implement just
# enough of the CQL wire protocol to exercise the server-side prepared
# metadata promotion path introduced for v5 compatibility.
# ---------------------------------------------------------------------------
# CQL opcodes
_OP_STARTUP = 0x01
_OP_AUTH_RESPONSE = 0x0F
_OP_PREPARE = 0x09
_OP_EXECUTE = 0x0A
_OP_READY = 0x02
_OP_AUTHENTICATE = 0x03
_OP_RESULT = 0x08
_OP_AUTH_SUCCESS = 0x10
# RESULT kind codes
_RESULT_KIND_ROWS = 0x00000002
_RESULT_KIND_PREPARED = 0x00000004
# Rows metadata flags (bit positions in the uint32 flags field)
_META_NO_METADATA = 1 << 2
_META_METADATA_CHANGED = 1 << 3
# EXECUTE options flags (1-byte field in CQL v4)
_FLAG_SKIP_METADATA = 0x02
_FRAME_HEADER_SIZE = 9 # version(1)+flags(1)+stream(2)+opcode(1)+length(4)
_CQL_VERSION = "3.0.0"
_DEFAULT_CONSISTENCY = 0x0006 # LOCAL_QUORUM
def _pack_short(v: int) -> bytes:
return struct.pack(">H", v)
def _pack_int(v: int) -> bytes:
return struct.pack(">I", v)
def _short_bytes(b: bytes) -> bytes:
"""CQL [short bytes]: uint16 length prefix + payload."""
return _pack_short(len(b)) + b
def _long_string(s: str) -> bytes:
"""CQL [long string]: uint32 length prefix + UTF-8 bytes."""
b = s.encode()
return _pack_int(len(b)) + b
def _string_map(d: dict[str, str]) -> bytes:
"""CQL [string map]: uint16 count + (uint16-prefixed-string, uint16-prefixed-string)*."""
out = _pack_short(len(d))
for k, v in d.items():
out += _short_bytes(k.encode())
out += _short_bytes(v.encode())
return out
def _frame(opcode: int, body: bytes, stream: int) -> bytes:
"""Build a CQL v4 request frame."""
return struct.pack(">BBHBI", 0x04, 0x00, stream, opcode, len(body)) + body
def _recv_frame(sock: socket.socket) -> tuple[int, int, bytes]:
"""Read one CQL v4 response frame; return (stream, opcode, body)."""
header = b""
while len(header) < _FRAME_HEADER_SIZE:
chunk = sock.recv(_FRAME_HEADER_SIZE - len(header))
assert chunk, "Connection closed while reading frame header"
header += chunk
_version, _flags = struct.unpack(">BB", header[0:2])
stream = struct.unpack(">H", header[2:4])[0]
opcode = header[4]
length = struct.unpack(">I", header[5:9])[0]
body = b""
while len(body) < length:
chunk = sock.recv(length - len(body))
assert chunk, "Connection closed while reading frame body"
body += chunk
return stream, opcode, body
@dataclasses.dataclass
class ExecuteResult:
"""Parsed outcome of a ROWS EXECUTE response."""
metadata_changed: bool
no_metadata: bool
column_count: int
result_metadata_id: bytes | None
def _cql_connect(host: str, port: int, username: str, password: str) -> socket.socket:
"""
Open a raw TCP socket to *host*:*port* and perform the CQL v4 handshake,
negotiating the SCYLLA_USE_METADATA_ID extension so that result_metadata_id
is exchanged on the wire — identical to the mandatory CQL v5 behaviour.
"""
sock = socket.create_connection((host, port))
stream = 1
# STARTUP with SCYLLA_USE_METADATA_ID enables the v5-style metadata_id
# exchange for this v4 connection.
startup_opts = {"CQL_VERSION": _CQL_VERSION, "SCYLLA_USE_METADATA_ID": ""}
sock.sendall(_frame(_OP_STARTUP, _string_map(startup_opts), stream))
_, opcode, payload = _recv_frame(sock)
if opcode == _OP_READY:
return sock
assert opcode == _OP_AUTHENTICATE, (
f"Expected AUTHENTICATE(0x{_OP_AUTHENTICATE:02x}), got 0x{opcode:02x}"
)
# PlainText SASL token: NUL + username + NUL + password
creds = b"\x00" + username.encode() + b"\x00" + password.encode()
stream += 1
sock.sendall(_frame(_OP_AUTH_RESPONSE, _short_bytes(creds), stream))
_, auth_op, _ = _recv_frame(sock)
assert auth_op == _OP_AUTH_SUCCESS, f"Authentication failed: opcode=0x{auth_op:02x}"
return sock
def _cql_prepare(sock: socket.socket, stream: int, query: str) -> bytes:
"""PREPARE *query* and return the server-assigned query_id."""
sock.sendall(_frame(_OP_PREPARE, _long_string(query), stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_PREPARED, f"Expected PREPARED kind, got {kind}"
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
return bytes(payload[pos : pos + id_len])
def _cql_execute_with_metadata_id(
sock: socket.socket,
stream: int,
query_id: bytes,
result_metadata_id: bytes,
consistency: int = _DEFAULT_CONSISTENCY,
) -> ExecuteResult:
"""
Send EXECUTE carrying *result_metadata_id* on the wire.
With SCYLLA_USE_METADATA_ID active the server reads result_metadata_id
immediately after query_id (before the options block), mirroring CQL v5
wire format. SKIP_METADATA is set so a normal response returns no column
specs; only the METADATA_CHANGED promotion path returns actual metadata.
"""
# options block: [consistency: uint16][flags: byte]
options = struct.pack(">HB", consistency, _FLAG_SKIP_METADATA)
body = _short_bytes(query_id) + _short_bytes(result_metadata_id) + options
sock.sendall(_frame(_OP_EXECUTE, body, stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_ROWS, f"Expected ROWS kind, got {kind}"
meta_flags = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
column_count = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
metadata_changed = bool(meta_flags & _META_METADATA_CHANGED)
no_metadata = bool(meta_flags & _META_NO_METADATA)
response_metadata_id: bytes | None = None
if metadata_changed:
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
response_metadata_id = bytes(payload[pos : pos + id_len])
return ExecuteResult(
metadata_changed=metadata_changed,
no_metadata=no_metadata,
column_count=column_count,
result_metadata_id=response_metadata_id,
)
def _prepare_and_execute(
host: str, query: str, stale_metadata_id: bytes
) -> ExecuteResult:
"""
Open a raw socket connection (negotiating SCYLLA_USE_METADATA_ID), prepare
*query*, execute it with *stale_metadata_id*, and return the parsed result.
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
"""
sock = _cql_connect(host, 9042, "cassandra", "cassandra")
try:
stream = 1
stream += 1
query_id = _cql_prepare(sock, stream, query)
stream += 1
return _cql_execute_with_metadata_id(sock, stream, query_id, stale_metadata_id)
finally:
sock.close()
@pytest.mark.asyncio
async def test_list_roles_of_prepared_metadata_promotion(
manager: ManagerClient,
) -> None:
"""Verify that the server promotes the prepared metadata_id for statements
whose PREPARE response carries empty result metadata (NO_METADATA).
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
does not know the result set schema because the statement implementation
builds the metadata dynamically at execute time. The server therefore
returns the metadata_id of empty metadata in the PREPARE response.
When the client later sends EXECUTE with SKIP_METADATA and the stale
empty metadata_id, the server should detect the mismatch (the actual rows
have real metadata) and respond with a ``METADATA_CHANGED`` result that
carries the real metadata_id so the client can update its cache. This is
the behaviour mandated by CQL v5; on CQL v4 it is exercised via the
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
wire-level exchange.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
# Any non-empty bytes that differ from the real metadata_id serves as the
# "stale" cache entry the client would send after a PREPARE that returned
# empty metadata.
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
result = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}", stale_metadata_id
)
assert result.metadata_changed, (
f"expected EXECUTE for LIST ROLES OF {role} to return METADATA_CHANGED "
f"after PREPARE returned an empty result_metadata_id"
)
assert not result.no_metadata, (
f"expected EXECUTE for LIST ROLES OF {role} to not have NO_METADATA flag "
f"when METADATA_CHANGED is set"
)
assert result.result_metadata_id is not None, (
f"expected EXECUTE for LIST ROLES OF {role} to return a result_metadata_id "
f"alongside METADATA_CHANGED"
)
@pytest.mark.asyncio
@pytest.mark.skip_mode(
mode="release", reason="error injection is disabled in release mode"
)
async def test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection(
manager: ManagerClient,
) -> None:
"""Verify that the ``skip_rows_metadata_changed_response`` error injection
suppresses the metadata promotion, leaving the response with NO_METADATA
and without METADATA_CHANGED.
This is the negative/regression counterpart of
``test_list_roles_of_prepared_metadata_promotion``: it confirms that the
happy-path test is not a false positive by showing that the promotion can
be disabled, and that the injection point itself works correctly.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
async with inject_error(
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
):
async with inject_error(
manager.api, server.ip_addr, "skip_rows_metadata_changed_response"
):
result = await asyncio.to_thread(
_prepare_and_execute,
server.ip_addr,
f"LIST ROLES OF {role}",
stale_metadata_id,
)
assert not result.metadata_changed, (
f"expected injected EXECUTE for LIST ROLES OF {role} to suppress "
f"METADATA_CHANGED, but the flag was set"
)
assert result.no_metadata, (
f"expected injected EXECUTE for LIST ROLES OF {role} to keep the "
f"stale NO_METADATA path, but no_metadata flag was not set"
)

View File

@@ -69,6 +69,7 @@
#include "message/messaging_service.hh"
#include "idl/forward_cql.dist.hh"
#include "utils/bit_cast.hh"
#include "utils/error_injection.hh"
#include "utils/labels.hh"
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
@@ -1633,13 +1634,26 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
}
tracing::trace(trace_state, "Processing a statement");
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
auto cache_key_for_metadata = cache_key;
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, prepared, std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id), &qp, cache_key = std::move(cache_key_for_metadata), prepared = std::move(prepared)] (auto msg) mutable {
if (msg->move_to_shard()) {
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
} else {
if (prepared->result_metadata_is_empty()
&& metadata_id.has_request_metadata_id()
&& !utils::get_local_injector().enter("skip_prepared_result_metadata_promotion")) {
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
auto rows_metadata_id = rows->rs().get_metadata().calculate_metadata_id();
clogger.debug("prepared result metadata promotion: request_metadata_id_present={}, calculated_rows_metadata_id_size={}",
metadata_id.has_request_metadata_id(), rows_metadata_id._metadata_id.size());
qp.local().update_prepared_result_metadata_id(cache_key, rows_metadata_id);
auto request_metadata_id = metadata_id.get_request_metadata_id();
metadata_id = cql_metadata_id_wrapper(std::move(request_metadata_id), std::move(rows_metadata_id));
}
}
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
}
@@ -2507,9 +2521,16 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
no_metadata, skip_rows_metadata_changed_response);
if (!skip_rows_metadata_changed_response) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
}
}