mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
Compare commits
2 Commits
copilot/fi
...
SCYLLADB-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0313d3e291 | ||
|
|
4a06935bb0 |
@@ -137,6 +137,15 @@ public:
|
||||
return value_type();
|
||||
}
|
||||
|
||||
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
|
||||
cache_value_ptr vp = _cache.find(key.key());
|
||||
if (!vp) {
|
||||
return false;
|
||||
}
|
||||
(*vp)->update_result_metadata_id(std::move(metadata_id));
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename Pred>
|
||||
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
|
||||
void remove_if(Pred&& pred) {
|
||||
|
||||
@@ -481,6 +481,12 @@ public:
|
||||
|
||||
void update_authorized_prepared_cache_config();
|
||||
|
||||
/// Update the result metadata_id of a cached prepared statement.
|
||||
/// Returns true if the entry was found and updated, false if it was evicted.
|
||||
bool update_prepared_result_metadata_id(const cql3::prepared_cache_key_type& cache_key, cql3::cql_metadata_id_type metadata_id) {
|
||||
return _prepared_cache.update_result_metadata_id(cache_key, std::move(metadata_id));
|
||||
}
|
||||
|
||||
void reset_cache();
|
||||
|
||||
bool topology_global_queue_empty();
|
||||
|
||||
@@ -52,6 +52,7 @@ public:
|
||||
std::vector<sstring> warnings;
|
||||
private:
|
||||
cql_metadata_id_type _metadata_id;
|
||||
bool _result_metadata_is_empty;
|
||||
|
||||
public:
|
||||
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
|
||||
@@ -71,6 +72,15 @@ public:
|
||||
void calculate_metadata_id();
|
||||
|
||||
cql_metadata_id_type get_metadata_id() const;
|
||||
|
||||
bool result_metadata_is_empty() const {
|
||||
return _result_metadata_is_empty;
|
||||
}
|
||||
|
||||
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
|
||||
_metadata_id = std::move(metadata_id);
|
||||
_result_metadata_is_empty = false;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
|
||||
, partition_key_bind_indices(std::move(partition_key_bind_indices))
|
||||
, warnings(std::move(warnings))
|
||||
, _metadata_id(bytes{})
|
||||
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
|
||||
{
|
||||
statement->set_audit_info(std::move(audit_info));
|
||||
}
|
||||
|
||||
@@ -34,6 +34,9 @@
|
||||
#include "test/lib/log.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "transport/request.hh"
|
||||
#include "transport/response.hh"
|
||||
#include "utils/memory_data_sink.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(schema_change_test)
|
||||
|
||||
@@ -1181,6 +1184,16 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
|
||||
return cql3::metadata{columns_specification}.calculate_metadata_id();
|
||||
}
|
||||
|
||||
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
|
||||
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
|
||||
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
|
||||
columns_specification.reserve(columns.size());
|
||||
for (const auto& column : columns) {
|
||||
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
|
||||
}
|
||||
return columns_specification;
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
|
||||
const auto c = std::make_pair("id", uuid_type);
|
||||
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
|
||||
@@ -1231,6 +1244,55 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
|
||||
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
|
||||
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
|
||||
auto columns_specification = make_columns_specification({{"role", utf8_type}});
|
||||
cql3::metadata rows_metadata(columns_specification);
|
||||
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
|
||||
|
||||
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
|
||||
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
|
||||
std::move(empty_metadata_id),
|
||||
cql3::cql_metadata_id_type(bytes(rows_metadata_id._metadata_id))), true);
|
||||
|
||||
memory_data_sink_buffers buffers;
|
||||
{
|
||||
output_stream<char> out(data_sink(std::make_unique<memory_data_sink>(buffers)));
|
||||
co_await resp.write_message(out, 4, cql_transport::cql_compression::none, deleter());
|
||||
co_await out.close();
|
||||
}
|
||||
auto total_length = buffers.size();
|
||||
auto fbufs = fragmented_temporary_buffer(buffers.buffers() | std::views::as_rvalue | std::ranges::to<std::vector>(), total_length);
|
||||
|
||||
bytes_ostream linearization_buffer;
|
||||
auto req = cql_transport::request_reader(fbufs.get_istream(), linearization_buffer);
|
||||
BOOST_REQUIRE_EQUAL(unsigned(uint8_t(req.read_byte().value())), 4 | 0x80);
|
||||
BOOST_REQUIRE_EQUAL(unsigned(req.read_byte().value()), 0);
|
||||
BOOST_REQUIRE_EQUAL(req.read_short().value(), 0);
|
||||
BOOST_REQUIRE_EQUAL(unsigned(req.read_byte().value()), unsigned(uint8_t(cql_transport::cql_binary_opcode::RESULT)));
|
||||
BOOST_REQUIRE_EQUAL(req.read_int().value(), total_length - 9);
|
||||
|
||||
auto body = req.read_raw_bytes_view(req.bytes_left()).value();
|
||||
const auto* ptr = reinterpret_cast<const char*>(body.begin());
|
||||
|
||||
const auto flags_mask = read_be<int32_t>(ptr);
|
||||
ptr += sizeof(int32_t);
|
||||
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
|
||||
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
|
||||
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
|
||||
|
||||
const auto column_count = read_be<int32_t>(ptr);
|
||||
ptr += sizeof(int32_t);
|
||||
BOOST_REQUIRE_EQUAL(column_count, 1);
|
||||
|
||||
const auto metadata_id_length = read_be<uint16_t>(ptr);
|
||||
ptr += sizeof(uint16_t);
|
||||
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
|
||||
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
|
||||
reinterpret_cast<const bytes::value_type*>(ptr)));
|
||||
co_return;
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
|
||||
|
||||
auto compute_metadata_id_for_type = [&](
|
||||
|
||||
267
test/cluster/auth_cluster/test_prepared_metadata_promotion.py
Normal file
267
test/cluster/auth_cluster/test_prepared_metadata_promotion.py
Normal file
@@ -0,0 +1,267 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import dataclasses
|
||||
from typing import Optional
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from cassandra import ProtocolVersion
|
||||
from cassandra.application_info import ApplicationInfoBase
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
from cassandra.protocol import ResultMessage
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.util import unique_name
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Driver helpers for SCYLLA_USE_METADATA_ID / result_metadata_id exchange.
|
||||
#
|
||||
# The standard Python driver gates result_metadata_id exchange on protocol v5+
|
||||
# (ProtocolVersion.uses_prepared_metadata). ScyllaDB does not implement v5
|
||||
# but exposes the same semantics on v4 via the SCYLLA_USE_METADATA_ID startup
|
||||
# extension. Two lightweight patches make the driver exercise this path:
|
||||
#
|
||||
# 1. _UseMetadataId — ApplicationInfoBase subclass that injects
|
||||
# SCYLLA_USE_METADATA_ID into the STARTUP options dict. Passed to
|
||||
# Cluster(application_info=...). The driver merges these options into
|
||||
# the STARTUP frame without any filtering.
|
||||
#
|
||||
# 2. mock.patch.object(ProtocolVersion, "uses_prepared_metadata", ...) —
|
||||
# makes the driver write result_metadata_id in EXECUTE frames and read
|
||||
# it back from PREPARE/ROWS responses, which is exactly the v4 extension
|
||||
# wire format.
|
||||
#
|
||||
# Note: the driver does not send the SKIP_METADATA flag in EXECUTE even with
|
||||
# these patches (it never ORs it into the options flags byte for prepared
|
||||
# statements). The server does not require SKIP_METADATA to trigger
|
||||
# promotion; without it, it returns full column metadata alongside
|
||||
# METADATA_CHANGED.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _UseMetadataId(ApplicationInfoBase):
|
||||
"""Inject SCYLLA_USE_METADATA_ID into the CQL STARTUP options."""
|
||||
|
||||
def add_startup_options(self, options: dict) -> None:
|
||||
options["SCYLLA_USE_METADATA_ID"] = ""
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class _ExecuteResult:
|
||||
"""Parsed outcome of interest from a prepared-statement EXECUTE."""
|
||||
|
||||
initial_metadata_id: Optional[bytes]
|
||||
"""result_metadata_id returned by the PREPARE response."""
|
||||
|
||||
result_metadata_id: Optional[bytes]
|
||||
"""result_metadata_id embedded in the ROWS EXECUTE response, if any."""
|
||||
|
||||
metadata_changed: bool
|
||||
"""True when the ROWS response carried the METADATA_CHANGED result-metadata flag (i.e., promotion occurred)."""
|
||||
|
||||
row_count: int
|
||||
"""Number of rows returned by the EXECUTE response."""
|
||||
|
||||
|
||||
def _prepare_and_execute(host: str, query: str) -> _ExecuteResult:
|
||||
"""
|
||||
Connect via the Scylla Python driver with SCYLLA_USE_METADATA_ID negotiated,
|
||||
prepare *query*, execute it once, and return relevant metadata_id fields.
|
||||
|
||||
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
|
||||
|
||||
The function uses two patches scoped to the connection lifetime:
|
||||
|
||||
* ``ProtocolVersion.uses_prepared_metadata`` is forced to return ``True``
|
||||
for all protocol versions so that the driver reads/writes result_metadata_id
|
||||
in PREPARE and EXECUTE frames on protocol v4.
|
||||
|
||||
* ``ResultMessage.recv_results_metadata`` is wrapped to capture
|
||||
result_metadata_id from the ROWS response (the driver parses it there but
|
||||
does not propagate it back to the PreparedStatement in the normal rows path).
|
||||
"""
|
||||
captured: dict = {"metadata_id": None, "metadata_changed": False}
|
||||
original_recv = ResultMessage.recv_results_metadata
|
||||
|
||||
def _capturing_recv(self: ResultMessage, f, user_type_map) -> None:
|
||||
original_recv(self, f, user_type_map)
|
||||
rmi = getattr(self, "result_metadata_id", None)
|
||||
if rmi is not None:
|
||||
captured["metadata_id"] = rmi
|
||||
captured["metadata_changed"] = True
|
||||
|
||||
with mock.patch.object(
|
||||
ProtocolVersion,
|
||||
"uses_prepared_metadata",
|
||||
staticmethod(lambda v: True),
|
||||
):
|
||||
cluster = Cluster(
|
||||
contact_points=[host],
|
||||
port=9042,
|
||||
protocol_version=4,
|
||||
auth_provider=PlainTextAuthProvider("cassandra", "cassandra"),
|
||||
application_info=_UseMetadataId(),
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy([host]),
|
||||
)
|
||||
session = cluster.connect()
|
||||
try:
|
||||
ps = session.prepare(query)
|
||||
initial_metadata_id = ps.result_metadata_id
|
||||
with mock.patch.object(
|
||||
ResultMessage, "recv_results_metadata", _capturing_recv
|
||||
):
|
||||
rows = list(session.execute(ps))
|
||||
return _ExecuteResult(
|
||||
initial_metadata_id=initial_metadata_id,
|
||||
result_metadata_id=captured["metadata_id"],
|
||||
metadata_changed=captured["metadata_changed"],
|
||||
row_count=len(rows),
|
||||
)
|
||||
finally:
|
||||
session.shutdown()
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_roles_of_prepared_metadata_promotion(
|
||||
manager: ManagerClient,
|
||||
build_mode: str,
|
||||
) -> None:
|
||||
"""Verify that EXECUTE promotes a stale prepared metadata_id, and that
|
||||
disabling that promotion suppresses the resulting ``METADATA_CHANGED``.
|
||||
|
||||
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
|
||||
does not know the result set schema because the statement implementation
|
||||
builds the metadata dynamically at execute time. The server therefore
|
||||
returns the metadata_id of empty metadata in the PREPARE response.
|
||||
|
||||
When the client later sends EXECUTE with the stale empty metadata_id, the
|
||||
server should detect the mismatch (the actual rows have real metadata) and
|
||||
respond with a ``METADATA_CHANGED`` result that carries the real
|
||||
metadata_id so the client can update its cache. This is the behaviour
|
||||
mandated by CQL v5; on CQL v4 it is exercised via the
|
||||
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
|
||||
wire-level exchange. The test repeats PREPARE/EXECUTE on the same query
|
||||
to show that the promoted metadata_id is cached, and in non-release modes
|
||||
it contrasts that with an injected execution where the cache update is
|
||||
suppressed.
|
||||
"""
|
||||
server = await manager.server_add(config=auth_config)
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
|
||||
role = "r" + unique_name()
|
||||
await cql.run_async(f"CREATE ROLE {role}")
|
||||
|
||||
promoted = await asyncio.to_thread(
|
||||
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}"
|
||||
)
|
||||
|
||||
assert promoted.row_count > 0, (
|
||||
f"expected EXECUTE for 'LIST ROLES OF {role}' to return at least one row"
|
||||
)
|
||||
assert promoted.initial_metadata_id is not None, (
|
||||
f"expected PREPARE for 'LIST ROLES OF {role}' to return a result_metadata_id"
|
||||
)
|
||||
assert promoted.metadata_changed, (
|
||||
f"expected EXECUTE for 'LIST ROLES OF {role}' to return METADATA_CHANGED "
|
||||
f"after PREPARE returned an empty result_metadata_id"
|
||||
)
|
||||
assert promoted.result_metadata_id is not None, (
|
||||
f"expected EXECUTE for 'LIST ROLES OF {role}' to return a result_metadata_id "
|
||||
f"alongside METADATA_CHANGED"
|
||||
)
|
||||
assert promoted.initial_metadata_id != promoted.result_metadata_id, (
|
||||
f"expected promoted result_metadata_id to differ from the stale empty one "
|
||||
f"returned by PREPARE"
|
||||
)
|
||||
|
||||
cached = await asyncio.to_thread(
|
||||
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}"
|
||||
)
|
||||
|
||||
assert cached.row_count > 0, (
|
||||
f"expected second EXECUTE for 'LIST ROLES OF {role}' to return at least one row"
|
||||
)
|
||||
assert cached.initial_metadata_id == promoted.result_metadata_id, (
|
||||
f"expected second PREPARE for 'LIST ROLES OF {role}' to reuse the promoted "
|
||||
f"result_metadata_id from the first EXECUTE"
|
||||
)
|
||||
assert not cached.metadata_changed, (
|
||||
f"expected second EXECUTE for 'LIST ROLES OF {role}' not to return "
|
||||
f"METADATA_CHANGED after the cache had been promoted"
|
||||
)
|
||||
assert cached.result_metadata_id is None, (
|
||||
f"expected second EXECUTE for 'LIST ROLES OF {role}' not to return a new "
|
||||
f"result_metadata_id once the cache had been promoted"
|
||||
)
|
||||
|
||||
if build_mode == "release":
|
||||
return
|
||||
|
||||
# Use a fresh prepared statement key so the promotion above does not seed
|
||||
# the cache for the injected contrast case.
|
||||
injected_role = "r" + unique_name()
|
||||
await cql.run_async(f"CREATE ROLE {injected_role}")
|
||||
|
||||
async with inject_error(
|
||||
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
|
||||
):
|
||||
suppressed = await asyncio.to_thread(
|
||||
_prepare_and_execute,
|
||||
server.ip_addr,
|
||||
f"LIST ROLES OF {injected_role}",
|
||||
)
|
||||
|
||||
assert suppressed.row_count > 0, (
|
||||
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to return at least one row"
|
||||
)
|
||||
assert suppressed.initial_metadata_id is not None, (
|
||||
f"expected injected PREPARE for 'LIST ROLES OF {injected_role}' to return a result_metadata_id"
|
||||
)
|
||||
assert not suppressed.metadata_changed, (
|
||||
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to suppress "
|
||||
f"METADATA_CHANGED, but the flag was set"
|
||||
)
|
||||
assert suppressed.result_metadata_id is None, (
|
||||
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to omit "
|
||||
f"result_metadata_id when promotion is suppressed"
|
||||
)
|
||||
|
||||
promoted_after_suppression = await asyncio.to_thread(
|
||||
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {injected_role}"
|
||||
)
|
||||
|
||||
assert promoted_after_suppression.row_count > 0, (
|
||||
f"expected post-injection EXECUTE for 'LIST ROLES OF {injected_role}' to return at least one row"
|
||||
)
|
||||
assert (
|
||||
promoted_after_suppression.initial_metadata_id == suppressed.initial_metadata_id
|
||||
), (
|
||||
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' not to update the cached "
|
||||
f"result_metadata_id"
|
||||
)
|
||||
assert promoted_after_suppression.metadata_changed, (
|
||||
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
|
||||
f"return METADATA_CHANGED because the injected run left the cache stale"
|
||||
)
|
||||
assert promoted_after_suppression.result_metadata_id is not None, (
|
||||
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
|
||||
f"return a promoted result_metadata_id"
|
||||
)
|
||||
assert (
|
||||
promoted_after_suppression.initial_metadata_id
|
||||
!= promoted_after_suppression.result_metadata_id
|
||||
), (
|
||||
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
|
||||
f"promote the stale result_metadata_id left by the injected run"
|
||||
)
|
||||
@@ -65,6 +65,7 @@
|
||||
|
||||
#include "transport/cql_protocol_extension.hh"
|
||||
#include "utils/bit_cast.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "utils/result.hh"
|
||||
#include "utils/reusable_buffer.hh"
|
||||
@@ -1304,14 +1305,27 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
|
||||
}
|
||||
|
||||
tracing::trace(trace_state, "Processing a statement");
|
||||
// Evaluated once here: drives both the conditional cache_key copy below and the promotion block in the lambda.
|
||||
const bool should_promote_metadata_id = prepared->result_metadata_is_empty() && metadata_id.has_request_metadata_id() &&
|
||||
!utils::get_local_injector().enter("skip_prepared_result_metadata_promotion");
|
||||
auto maybe_copied_cache_key = should_promote_metadata_id ? std::optional(cache_key) : std::nullopt;
|
||||
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
|
||||
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
|
||||
.then([&qp, trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version,
|
||||
metadata_id = std::move(metadata_id), maybe_cache_key = std::move(maybe_copied_cache_key), should_promote_metadata_id] (auto msg) mutable {
|
||||
if (msg->move_to_shard()) {
|
||||
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg)));
|
||||
} else if (msg->is_exception()) {
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
||||
} else {
|
||||
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
||||
if (should_promote_metadata_id) {
|
||||
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
|
||||
auto real_id = rows->rs().get_metadata().calculate_metadata_id();
|
||||
qp.local().update_prepared_result_metadata_id(*maybe_cache_key, real_id);
|
||||
auto req = metadata_id.get_request_metadata_id();
|
||||
metadata_id = cql_metadata_id_wrapper(std::move(req), std::move(real_id));
|
||||
}
|
||||
}
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
|
||||
}
|
||||
});
|
||||
@@ -2222,9 +2236,16 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
|
||||
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
|
||||
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
|
||||
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
|
||||
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
||||
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
|
||||
no_metadata = false;
|
||||
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
|
||||
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
|
||||
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
|
||||
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
|
||||
no_metadata, skip_rows_metadata_changed_response);
|
||||
if (!skip_rows_metadata_changed_response) {
|
||||
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
||||
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
|
||||
no_metadata = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user