mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 00:20:47 +00:00
Compare commits
32 Commits
debug_form
...
SCYLLADB-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ce2a2a479 | ||
|
|
2cdd178379 | ||
|
|
9dad68e58d | ||
|
|
181ad9f476 | ||
|
|
854c374ebf | ||
|
|
b708e5d7c9 | ||
|
|
c38e312321 | ||
|
|
627a8294ed | ||
|
|
5a086ae9b7 | ||
|
|
c575bbf1e8 | ||
|
|
7fdd650009 | ||
|
|
552a2d0995 | ||
|
|
73de865ca3 | ||
|
|
f988ec18cb | ||
|
|
cd1679934c | ||
|
|
d52fbf7ada | ||
|
|
141aa2d696 | ||
|
|
c670183be8 | ||
|
|
e639dcda0b | ||
|
|
503a6e2d7e | ||
|
|
0f02c0d6fa | ||
|
|
4fead4baae | ||
|
|
ffd58ca1f0 | ||
|
|
f6fd3bbea0 | ||
|
|
148217bed6 | ||
|
|
2b472fe7fd | ||
|
|
ae12c712ce | ||
|
|
dd446aa442 | ||
|
|
dea79b09a9 | ||
|
|
3d04fd1d13 | ||
|
|
f5438e0587 | ||
|
|
f6ab576ed9 |
@@ -583,8 +583,7 @@ sstable_format: ms
|
|||||||
audit: "table"
|
audit: "table"
|
||||||
#
|
#
|
||||||
# List of statement categories that should be audited.
|
# List of statement categories that should be audited.
|
||||||
# Possible categories are: QUERY, DML, DCL, DDL, AUTH, ADMIN
|
audit_categories: "DCL,DDL,AUTH,ADMIN"
|
||||||
audit_categories: "DCL,AUTH,ADMIN"
|
|
||||||
#
|
#
|
||||||
# List of tables that should be audited.
|
# List of tables that should be audited.
|
||||||
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"
|
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"
|
||||||
|
|||||||
@@ -143,6 +143,15 @@ public:
|
|||||||
return value_type();
|
return value_type();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
|
||||||
|
cache_value_ptr vp = _cache.find(key.key());
|
||||||
|
if (!vp) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
(*vp)->update_result_metadata_id(std::move(metadata_id));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
template <typename Pred>
|
template <typename Pred>
|
||||||
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
|
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
|
||||||
void remove_if(Pred&& pred) {
|
void remove_if(Pred&& pred) {
|
||||||
|
|||||||
@@ -260,6 +260,10 @@ public:
|
|||||||
return _prepared_cache.find(key);
|
return _prepared_cache.find(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool update_prepared_result_metadata_id(const prepared_cache_key_type& key, cql_metadata_id_type metadata_id) {
|
||||||
|
return _prepared_cache.update_result_metadata_id(key, std::move(metadata_id));
|
||||||
|
}
|
||||||
|
|
||||||
inline
|
inline
|
||||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||||
execute_prepared(
|
execute_prepared(
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ public:
|
|||||||
std::vector<sstring> warnings;
|
std::vector<sstring> warnings;
|
||||||
private:
|
private:
|
||||||
cql_metadata_id_type _metadata_id;
|
cql_metadata_id_type _metadata_id;
|
||||||
|
bool _result_metadata_is_empty;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
|
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
|
||||||
@@ -71,6 +72,15 @@ public:
|
|||||||
void calculate_metadata_id();
|
void calculate_metadata_id();
|
||||||
|
|
||||||
cql_metadata_id_type get_metadata_id() const;
|
cql_metadata_id_type get_metadata_id() const;
|
||||||
|
|
||||||
|
bool result_metadata_is_empty() const {
|
||||||
|
return _result_metadata_is_empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
|
||||||
|
_metadata_id = std::move(metadata_id);
|
||||||
|
_result_metadata_is_empty = false;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
|
|||||||
, partition_key_bind_indices(std::move(partition_key_bind_indices))
|
, partition_key_bind_indices(std::move(partition_key_bind_indices))
|
||||||
, warnings(std::move(warnings))
|
, warnings(std::move(warnings))
|
||||||
, _metadata_id(bytes{})
|
, _metadata_id(bytes{})
|
||||||
|
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
|
||||||
{
|
{
|
||||||
statement->set_audit_info(std::move(audit_info));
|
statement->set_audit_info(std::move(audit_info));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1582,7 +1582,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
|||||||
"\tnone : No auditing enabled.\n"
|
"\tnone : No auditing enabled.\n"
|
||||||
"\tsyslog : Audit messages sent to Syslog.\n"
|
"\tsyslog : Audit messages sent to Syslog.\n"
|
||||||
"\ttable : Audit messages written to column family named audit.audit_log.\n")
|
"\ttable : Audit messages written to column family named audit.audit_log.\n")
|
||||||
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
|
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,DDL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
|
||||||
, audit_tables(this, "audit_tables", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of table names (<keyspace>.<table>) that will be audited.")
|
, audit_tables(this, "audit_tables", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of table names (<keyspace>.<table>) that will be audited.")
|
||||||
, audit_keyspaces(this, "audit_keyspaces", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited")
|
, audit_keyspaces(this, "audit_keyspaces", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited")
|
||||||
, audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writing to syslog. Only applicable when audit is set to syslog.")
|
, audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writing to syslog. Only applicable when audit is set to syslog.")
|
||||||
|
|||||||
@@ -42,7 +42,14 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
|
|||||||
|
|
||||||
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
|
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
|
||||||
const auto replication_factor = erm.get_replication_factor();
|
const auto replication_factor = erm.get_replication_factor();
|
||||||
if (read_replicas.size() > replication_factor) {
|
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
|
||||||
|
if (read_replicas.size() > replication_factor + 1) {
|
||||||
|
return seastar::format(
|
||||||
|
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
|
||||||
|
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
|
||||||
|
read_replicas.size(), replication_factor);
|
||||||
|
}
|
||||||
|
} else if (read_replicas.size() > replication_factor) {
|
||||||
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
|
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
|
||||||
}
|
}
|
||||||
return {};
|
return {};
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
version https://git-lfs.github.com/spec/v1
|
version https://git-lfs.github.com/spec/v1
|
||||||
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
|
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
|
||||||
size 6502668
|
size 6598648
|
||||||
|
|||||||
@@ -1109,6 +1109,18 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
|
|||||||
// case.
|
// case.
|
||||||
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
|
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
|
||||||
_stats.store_term_and_vote++;
|
_stats.store_term_and_vote++;
|
||||||
|
|
||||||
|
// When the term advances, any in-flight snapshot transfers
|
||||||
|
// belong to an outdated term: the progress tracker has been
|
||||||
|
// reset in become_leader() or we are now a follower.
|
||||||
|
// Abort them before we dispatch this batch's messages, which
|
||||||
|
// may start fresh transfers for the new term.
|
||||||
|
//
|
||||||
|
// A vote may also change independently of the term (e.g. a
|
||||||
|
// follower voting for a candidate at the same term), but in
|
||||||
|
// that case there are no in-flight transfers and the abort
|
||||||
|
// is a no-op.
|
||||||
|
abort_snapshot_transfers();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (batch.snp) {
|
if (batch.snp) {
|
||||||
@@ -1218,8 +1230,6 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
|
|||||||
// quickly) stop happening (we're outside the config after all).
|
// quickly) stop happening (we're outside the config after all).
|
||||||
co_await _apply_entries.push_eventually(removed_from_config{});
|
co_await _apply_entries.push_eventually(removed_from_config{});
|
||||||
}
|
}
|
||||||
// request aborts of snapshot transfers
|
|
||||||
abort_snapshot_transfers();
|
|
||||||
// abort all read barriers
|
// abort all read barriers
|
||||||
for (auto& r : _reads) {
|
for (auto& r : _reads) {
|
||||||
r.promise.set_value(not_a_leader{_fsm->current_leader()});
|
r.promise.set_value(not_a_leader{_fsm->current_leader()});
|
||||||
|
|||||||
@@ -87,6 +87,11 @@ target_include_directories(wasmtime_bindings
|
|||||||
target_link_libraries(wasmtime_bindings
|
target_link_libraries(wasmtime_bindings
|
||||||
INTERFACE Rust::rust_combined)
|
INTERFACE Rust::rust_combined)
|
||||||
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
||||||
|
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
|
||||||
|
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
|
||||||
|
# this PCH must have matching compile options, otherwise the compiler rejects
|
||||||
|
# the PCH due to flag mismatch (e.g., -fsanitize=address).
|
||||||
|
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
|
||||||
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
|
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
@@ -108,5 +113,6 @@ target_include_directories(inc
|
|||||||
target_link_libraries(inc
|
target_link_libraries(inc
|
||||||
INTERFACE Rust::rust_combined)
|
INTERFACE Rust::rust_combined)
|
||||||
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
||||||
|
target_link_libraries(inc PRIVATE Seastar::seastar)
|
||||||
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
|
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
|
||||||
endif()
|
endif()
|
||||||
|
|||||||
2
test.py
2
test.py
@@ -181,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
|
|||||||
help="Run only tests for given build mode(s)")
|
help="Run only tests for given build mode(s)")
|
||||||
parser.add_argument('--repeat', action="store", default="1", type=int,
|
parser.add_argument('--repeat', action="store", default="1", type=int,
|
||||||
help="number of times to repeat test execution")
|
help="number of times to repeat test execution")
|
||||||
parser.add_argument('--timeout', action="store", default="24000", type=int,
|
parser.add_argument('--timeout', action="store", default="3600", type=int,
|
||||||
help="timeout value for single test execution")
|
help="timeout value for single test execution")
|
||||||
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
|
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
|
||||||
help="timeout value for test.py/pytest session execution")
|
help="timeout value for test.py/pytest session execution")
|
||||||
|
|||||||
@@ -29,6 +29,7 @@
|
|||||||
#include "test/lib/exception_utils.hh"
|
#include "test/lib/exception_utils.hh"
|
||||||
#include "test/lib/log.hh"
|
#include "test/lib/log.hh"
|
||||||
#include "test/lib/test_utils.hh"
|
#include "test/lib/test_utils.hh"
|
||||||
|
#include "transport/response.hh"
|
||||||
|
|
||||||
BOOST_AUTO_TEST_SUITE(schema_change_test)
|
BOOST_AUTO_TEST_SUITE(schema_change_test)
|
||||||
|
|
||||||
@@ -701,6 +702,16 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
|
|||||||
return cql3::metadata{columns_specification}.calculate_metadata_id();
|
return cql3::metadata{columns_specification}.calculate_metadata_id();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
|
||||||
|
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
|
||||||
|
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
|
||||||
|
columns_specification.reserve(columns.size());
|
||||||
|
for (const auto& column : columns) {
|
||||||
|
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
|
||||||
|
}
|
||||||
|
return columns_specification;
|
||||||
|
}
|
||||||
|
|
||||||
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
|
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
|
||||||
const auto c = std::make_pair("id", uuid_type);
|
const auto c = std::make_pair("id", uuid_type);
|
||||||
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
|
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
|
||||||
@@ -751,6 +762,39 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
|
|||||||
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
|
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
|
||||||
|
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
|
||||||
|
auto stale_response_metadata_id = empty_metadata_id;
|
||||||
|
auto columns_specification = make_columns_specification({{"role", utf8_type}});
|
||||||
|
cql3::metadata rows_metadata(columns_specification);
|
||||||
|
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
|
||||||
|
|
||||||
|
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
|
||||||
|
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
|
||||||
|
std::move(empty_metadata_id),
|
||||||
|
std::move(stale_response_metadata_id)), true);
|
||||||
|
|
||||||
|
auto body_stream = std::move(resp).extract_body();
|
||||||
|
auto body = body_stream.linearize();
|
||||||
|
const auto* ptr = reinterpret_cast<const char*>(body.begin());
|
||||||
|
|
||||||
|
const auto flags_mask = read_be<int32_t>(ptr);
|
||||||
|
ptr += sizeof(int32_t);
|
||||||
|
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
|
||||||
|
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
|
||||||
|
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
|
||||||
|
|
||||||
|
const auto column_count = read_be<int32_t>(ptr);
|
||||||
|
ptr += sizeof(int32_t);
|
||||||
|
BOOST_REQUIRE_EQUAL(column_count, 1);
|
||||||
|
|
||||||
|
const auto metadata_id_length = read_be<uint16_t>(ptr);
|
||||||
|
ptr += sizeof(uint16_t);
|
||||||
|
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
|
||||||
|
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
|
||||||
|
reinterpret_cast<const bytes::value_type*>(ptr)));
|
||||||
|
}
|
||||||
|
|
||||||
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
|
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
|
||||||
|
|
||||||
auto compute_metadata_id_for_type = [&](
|
auto compute_metadata_id_for_type = [&](
|
||||||
|
|||||||
328
test/cluster/auth_cluster/test_prepared_metadata_promotion.py
Normal file
328
test/cluster/auth_cluster/test_prepared_metadata_promotion.py
Normal file
@@ -0,0 +1,328 @@
|
|||||||
|
#
|
||||||
|
# Copyright (C) 2026-present ScyllaDB
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||||
|
#
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import dataclasses
|
||||||
|
import hashlib
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from test.pylib.manager_client import ManagerClient
|
||||||
|
from test.pylib.rest_client import inject_error
|
||||||
|
from test.pylib.util import unique_name
|
||||||
|
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Minimal raw CQL v4 socket helpers with SCYLLA_USE_METADATA_ID extension.
|
||||||
|
#
|
||||||
|
# The standard Python driver never negotiates SCYLLA_USE_METADATA_ID and
|
||||||
|
# therefore never includes result_metadata_id in EXECUTE requests for
|
||||||
|
# protocol v4. In CQL v5 result_metadata_id exchange is mandatory and
|
||||||
|
# built into the wire format; until Scylla implements v5, this extension
|
||||||
|
# provides the same semantics on v4. The helpers below implement just
|
||||||
|
# enough of the CQL wire protocol to exercise the server-side prepared
|
||||||
|
# metadata promotion path introduced for v5 compatibility.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# CQL opcodes
|
||||||
|
_OP_STARTUP = 0x01
|
||||||
|
_OP_AUTH_RESPONSE = 0x0F
|
||||||
|
_OP_PREPARE = 0x09
|
||||||
|
_OP_EXECUTE = 0x0A
|
||||||
|
_OP_READY = 0x02
|
||||||
|
_OP_AUTHENTICATE = 0x03
|
||||||
|
_OP_RESULT = 0x08
|
||||||
|
_OP_AUTH_SUCCESS = 0x10
|
||||||
|
|
||||||
|
# RESULT kind codes
|
||||||
|
_RESULT_KIND_ROWS = 0x00000002
|
||||||
|
_RESULT_KIND_PREPARED = 0x00000004
|
||||||
|
|
||||||
|
# Rows metadata flags (bit positions in the uint32 flags field)
|
||||||
|
_META_NO_METADATA = 1 << 2
|
||||||
|
_META_METADATA_CHANGED = 1 << 3
|
||||||
|
|
||||||
|
# EXECUTE options flags (1-byte field in CQL v4)
|
||||||
|
_FLAG_SKIP_METADATA = 0x02
|
||||||
|
|
||||||
|
_FRAME_HEADER_SIZE = 9 # version(1)+flags(1)+stream(2)+opcode(1)+length(4)
|
||||||
|
_CQL_VERSION = "3.0.0"
|
||||||
|
_DEFAULT_CONSISTENCY = 0x0006 # LOCAL_QUORUM
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_short(v: int) -> bytes:
|
||||||
|
return struct.pack(">H", v)
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_int(v: int) -> bytes:
|
||||||
|
return struct.pack(">I", v)
|
||||||
|
|
||||||
|
|
||||||
|
def _short_bytes(b: bytes) -> bytes:
|
||||||
|
"""CQL [short bytes]: uint16 length prefix + payload."""
|
||||||
|
return _pack_short(len(b)) + b
|
||||||
|
|
||||||
|
|
||||||
|
def _long_string(s: str) -> bytes:
|
||||||
|
"""CQL [long string]: uint32 length prefix + UTF-8 bytes."""
|
||||||
|
b = s.encode()
|
||||||
|
return _pack_int(len(b)) + b
|
||||||
|
|
||||||
|
|
||||||
|
def _string_map(d: dict[str, str]) -> bytes:
|
||||||
|
"""CQL [string map]: uint16 count + (uint16-prefixed-string, uint16-prefixed-string)*."""
|
||||||
|
out = _pack_short(len(d))
|
||||||
|
for k, v in d.items():
|
||||||
|
out += _short_bytes(k.encode())
|
||||||
|
out += _short_bytes(v.encode())
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _frame(opcode: int, body: bytes, stream: int) -> bytes:
|
||||||
|
"""Build a CQL v4 request frame."""
|
||||||
|
return struct.pack(">BBHBI", 0x04, 0x00, stream, opcode, len(body)) + body
|
||||||
|
|
||||||
|
|
||||||
|
def _recv_frame(sock: socket.socket) -> tuple[int, int, bytes]:
|
||||||
|
"""Read one CQL v4 response frame; return (stream, opcode, body)."""
|
||||||
|
header = b""
|
||||||
|
while len(header) < _FRAME_HEADER_SIZE:
|
||||||
|
chunk = sock.recv(_FRAME_HEADER_SIZE - len(header))
|
||||||
|
assert chunk, "Connection closed while reading frame header"
|
||||||
|
header += chunk
|
||||||
|
_version, _flags = struct.unpack(">BB", header[0:2])
|
||||||
|
stream = struct.unpack(">H", header[2:4])[0]
|
||||||
|
opcode = header[4]
|
||||||
|
length = struct.unpack(">I", header[5:9])[0]
|
||||||
|
body = b""
|
||||||
|
while len(body) < length:
|
||||||
|
chunk = sock.recv(length - len(body))
|
||||||
|
assert chunk, "Connection closed while reading frame body"
|
||||||
|
body += chunk
|
||||||
|
return stream, opcode, body
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class ExecuteResult:
|
||||||
|
"""Parsed outcome of a ROWS EXECUTE response."""
|
||||||
|
|
||||||
|
metadata_changed: bool
|
||||||
|
no_metadata: bool
|
||||||
|
column_count: int
|
||||||
|
result_metadata_id: bytes | None
|
||||||
|
|
||||||
|
|
||||||
|
def _cql_connect(host: str, port: int, username: str, password: str) -> socket.socket:
|
||||||
|
"""
|
||||||
|
Open a raw TCP socket to *host*:*port* and perform the CQL v4 handshake,
|
||||||
|
negotiating the SCYLLA_USE_METADATA_ID extension so that result_metadata_id
|
||||||
|
is exchanged on the wire — identical to the mandatory CQL v5 behaviour.
|
||||||
|
"""
|
||||||
|
sock = socket.create_connection((host, port))
|
||||||
|
stream = 1
|
||||||
|
|
||||||
|
# STARTUP with SCYLLA_USE_METADATA_ID enables the v5-style metadata_id
|
||||||
|
# exchange for this v4 connection.
|
||||||
|
startup_opts = {"CQL_VERSION": _CQL_VERSION, "SCYLLA_USE_METADATA_ID": ""}
|
||||||
|
sock.sendall(_frame(_OP_STARTUP, _string_map(startup_opts), stream))
|
||||||
|
_, opcode, payload = _recv_frame(sock)
|
||||||
|
|
||||||
|
if opcode == _OP_READY:
|
||||||
|
return sock
|
||||||
|
|
||||||
|
assert opcode == _OP_AUTHENTICATE, (
|
||||||
|
f"Expected AUTHENTICATE(0x{_OP_AUTHENTICATE:02x}), got 0x{opcode:02x}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# PlainText SASL token: NUL + username + NUL + password
|
||||||
|
creds = b"\x00" + username.encode() + b"\x00" + password.encode()
|
||||||
|
stream += 1
|
||||||
|
sock.sendall(_frame(_OP_AUTH_RESPONSE, _short_bytes(creds), stream))
|
||||||
|
_, auth_op, _ = _recv_frame(sock)
|
||||||
|
assert auth_op == _OP_AUTH_SUCCESS, f"Authentication failed: opcode=0x{auth_op:02x}"
|
||||||
|
return sock
|
||||||
|
|
||||||
|
|
||||||
|
def _cql_prepare(sock: socket.socket, stream: int, query: str) -> bytes:
|
||||||
|
"""PREPARE *query* and return the server-assigned query_id."""
|
||||||
|
sock.sendall(_frame(_OP_PREPARE, _long_string(query), stream))
|
||||||
|
_, opcode, payload = _recv_frame(sock)
|
||||||
|
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
|
||||||
|
|
||||||
|
pos = 0
|
||||||
|
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||||
|
pos += 4
|
||||||
|
assert kind == _RESULT_KIND_PREPARED, f"Expected PREPARED kind, got {kind}"
|
||||||
|
|
||||||
|
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
|
||||||
|
pos += 2
|
||||||
|
return bytes(payload[pos : pos + id_len])
|
||||||
|
|
||||||
|
|
||||||
|
def _cql_execute_with_metadata_id(
|
||||||
|
sock: socket.socket,
|
||||||
|
stream: int,
|
||||||
|
query_id: bytes,
|
||||||
|
result_metadata_id: bytes,
|
||||||
|
consistency: int = _DEFAULT_CONSISTENCY,
|
||||||
|
) -> ExecuteResult:
|
||||||
|
"""
|
||||||
|
Send EXECUTE carrying *result_metadata_id* on the wire.
|
||||||
|
|
||||||
|
With SCYLLA_USE_METADATA_ID active the server reads result_metadata_id
|
||||||
|
immediately after query_id (before the options block), mirroring CQL v5
|
||||||
|
wire format. SKIP_METADATA is set so a normal response returns no column
|
||||||
|
specs; only the METADATA_CHANGED promotion path returns actual metadata.
|
||||||
|
"""
|
||||||
|
# options block: [consistency: uint16][flags: byte]
|
||||||
|
options = struct.pack(">HB", consistency, _FLAG_SKIP_METADATA)
|
||||||
|
body = _short_bytes(query_id) + _short_bytes(result_metadata_id) + options
|
||||||
|
sock.sendall(_frame(_OP_EXECUTE, body, stream))
|
||||||
|
_, opcode, payload = _recv_frame(sock)
|
||||||
|
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
|
||||||
|
|
||||||
|
pos = 0
|
||||||
|
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||||
|
pos += 4
|
||||||
|
assert kind == _RESULT_KIND_ROWS, f"Expected ROWS kind, got {kind}"
|
||||||
|
|
||||||
|
meta_flags = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||||
|
pos += 4
|
||||||
|
column_count = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||||
|
pos += 4
|
||||||
|
|
||||||
|
metadata_changed = bool(meta_flags & _META_METADATA_CHANGED)
|
||||||
|
no_metadata = bool(meta_flags & _META_NO_METADATA)
|
||||||
|
|
||||||
|
response_metadata_id: bytes | None = None
|
||||||
|
if metadata_changed:
|
||||||
|
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
|
||||||
|
pos += 2
|
||||||
|
response_metadata_id = bytes(payload[pos : pos + id_len])
|
||||||
|
|
||||||
|
return ExecuteResult(
|
||||||
|
metadata_changed=metadata_changed,
|
||||||
|
no_metadata=no_metadata,
|
||||||
|
column_count=column_count,
|
||||||
|
result_metadata_id=response_metadata_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _prepare_and_execute(
|
||||||
|
host: str, query: str, stale_metadata_id: bytes
|
||||||
|
) -> ExecuteResult:
|
||||||
|
"""
|
||||||
|
Open a raw socket connection (negotiating SCYLLA_USE_METADATA_ID), prepare
|
||||||
|
*query*, execute it with *stale_metadata_id*, and return the parsed result.
|
||||||
|
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
|
||||||
|
"""
|
||||||
|
sock = _cql_connect(host, 9042, "cassandra", "cassandra")
|
||||||
|
try:
|
||||||
|
stream = 1
|
||||||
|
stream += 1
|
||||||
|
query_id = _cql_prepare(sock, stream, query)
|
||||||
|
stream += 1
|
||||||
|
return _cql_execute_with_metadata_id(sock, stream, query_id, stale_metadata_id)
|
||||||
|
finally:
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_roles_of_prepared_metadata_promotion(
|
||||||
|
manager: ManagerClient,
|
||||||
|
) -> None:
|
||||||
|
"""Verify that the server promotes the prepared metadata_id for statements
|
||||||
|
whose PREPARE response carries empty result metadata (NO_METADATA).
|
||||||
|
|
||||||
|
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
|
||||||
|
does not know the result set schema because the statement implementation
|
||||||
|
builds the metadata dynamically at execute time. The server therefore
|
||||||
|
returns the metadata_id of empty metadata in the PREPARE response.
|
||||||
|
|
||||||
|
When the client later sends EXECUTE with SKIP_METADATA and the stale
|
||||||
|
empty metadata_id, the server should detect the mismatch (the actual rows
|
||||||
|
have real metadata) and respond with a ``METADATA_CHANGED`` result that
|
||||||
|
carries the real metadata_id so the client can update its cache. This is
|
||||||
|
the behaviour mandated by CQL v5; on CQL v4 it is exercised via the
|
||||||
|
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
|
||||||
|
wire-level exchange.
|
||||||
|
"""
|
||||||
|
server = await manager.server_add(config=auth_config)
|
||||||
|
cql, _ = await manager.get_ready_cql([server])
|
||||||
|
|
||||||
|
role = "r" + unique_name()
|
||||||
|
await cql.run_async(f"CREATE ROLE {role}")
|
||||||
|
|
||||||
|
# Any non-empty bytes that differ from the real metadata_id serves as the
|
||||||
|
# "stale" cache entry the client would send after a PREPARE that returned
|
||||||
|
# empty metadata.
|
||||||
|
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
|
||||||
|
|
||||||
|
result = await asyncio.to_thread(
|
||||||
|
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}", stale_metadata_id
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.metadata_changed, (
|
||||||
|
f"expected EXECUTE for LIST ROLES OF {role} to return METADATA_CHANGED "
|
||||||
|
f"after PREPARE returned an empty result_metadata_id"
|
||||||
|
)
|
||||||
|
assert not result.no_metadata, (
|
||||||
|
f"expected EXECUTE for LIST ROLES OF {role} to not have NO_METADATA flag "
|
||||||
|
f"when METADATA_CHANGED is set"
|
||||||
|
)
|
||||||
|
assert result.result_metadata_id is not None, (
|
||||||
|
f"expected EXECUTE for LIST ROLES OF {role} to return a result_metadata_id "
|
||||||
|
f"alongside METADATA_CHANGED"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip_mode(
|
||||||
|
mode="release", reason="error injection is disabled in release mode"
|
||||||
|
)
|
||||||
|
async def test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection(
|
||||||
|
manager: ManagerClient,
|
||||||
|
) -> None:
|
||||||
|
"""Verify that the ``skip_rows_metadata_changed_response`` error injection
|
||||||
|
suppresses the metadata promotion, leaving the response with NO_METADATA
|
||||||
|
and without METADATA_CHANGED.
|
||||||
|
|
||||||
|
This is the negative/regression counterpart of
|
||||||
|
``test_list_roles_of_prepared_metadata_promotion``: it confirms that the
|
||||||
|
happy-path test is not a false positive by showing that the promotion can
|
||||||
|
be disabled, and that the injection point itself works correctly.
|
||||||
|
"""
|
||||||
|
server = await manager.server_add(config=auth_config)
|
||||||
|
cql, _ = await manager.get_ready_cql([server])
|
||||||
|
|
||||||
|
role = "r" + unique_name()
|
||||||
|
await cql.run_async(f"CREATE ROLE {role}")
|
||||||
|
|
||||||
|
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
|
||||||
|
|
||||||
|
async with inject_error(
|
||||||
|
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
|
||||||
|
):
|
||||||
|
async with inject_error(
|
||||||
|
manager.api, server.ip_addr, "skip_rows_metadata_changed_response"
|
||||||
|
):
|
||||||
|
result = await asyncio.to_thread(
|
||||||
|
_prepare_and_execute,
|
||||||
|
server.ip_addr,
|
||||||
|
f"LIST ROLES OF {role}",
|
||||||
|
stale_metadata_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not result.metadata_changed, (
|
||||||
|
f"expected injected EXECUTE for LIST ROLES OF {role} to suppress "
|
||||||
|
f"METADATA_CHANGED, but the flag was set"
|
||||||
|
)
|
||||||
|
assert result.no_metadata, (
|
||||||
|
f"expected injected EXECUTE for LIST ROLES OF {role} to keep the "
|
||||||
|
f"stale NO_METADATA path, but no_metadata flag was not set"
|
||||||
|
)
|
||||||
@@ -511,8 +511,7 @@ class AuditBackendComposite(AuditBackend):
|
|||||||
return rows_dict
|
return rows_dict
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.single_node
|
class CQLAuditTester(AuditTester):
|
||||||
class TestCQLAudit(AuditTester):
|
|
||||||
"""
|
"""
|
||||||
Make sure CQL statements are audited
|
Make sure CQL statements are audited
|
||||||
"""
|
"""
|
||||||
@@ -1763,7 +1762,7 @@ class TestCQLAudit(AuditTester):
|
|||||||
|
|
||||||
async def test_audit_table_noauth(manager: ManagerClient):
|
async def test_audit_table_noauth(manager: ManagerClient):
|
||||||
"""Table backend, no auth, single node — groups all tests that share this config."""
|
"""Table backend, no auth, single node — groups all tests that share this config."""
|
||||||
t = TestCQLAudit(manager)
|
t = CQLAuditTester(manager)
|
||||||
await t.test_using_non_existent_keyspace(AuditBackendTable)
|
await t.test_using_non_existent_keyspace(AuditBackendTable)
|
||||||
await t.test_audit_keyspace(AuditBackendTable)
|
await t.test_audit_keyspace(AuditBackendTable)
|
||||||
await t.test_audit_keyspace_extra_parameter(AuditBackendTable)
|
await t.test_audit_keyspace_extra_parameter(AuditBackendTable)
|
||||||
@@ -1787,7 +1786,7 @@ async def test_audit_table_noauth(manager: ManagerClient):
|
|||||||
|
|
||||||
async def test_audit_table_auth(manager: ManagerClient):
|
async def test_audit_table_auth(manager: ManagerClient):
|
||||||
"""Table backend, auth enabled, single node."""
|
"""Table backend, auth enabled, single node."""
|
||||||
t = TestCQLAudit(manager)
|
t = CQLAuditTester(manager)
|
||||||
await t.test_user_password_masking(AuditBackendTable)
|
await t.test_user_password_masking(AuditBackendTable)
|
||||||
await t.test_negative_audit_records_auth()
|
await t.test_negative_audit_records_auth()
|
||||||
await t.test_negative_audit_records_admin()
|
await t.test_negative_audit_records_admin()
|
||||||
@@ -1803,7 +1802,7 @@ async def test_audit_table_auth(manager: ManagerClient):
|
|||||||
|
|
||||||
async def test_audit_table_auth_multinode(manager: ManagerClient):
|
async def test_audit_table_auth_multinode(manager: ManagerClient):
|
||||||
"""Table backend, auth enabled, multi-node (rf=3)."""
|
"""Table backend, auth enabled, multi-node (rf=3)."""
|
||||||
t = TestCQLAudit(manager)
|
t = CQLAuditTester(manager)
|
||||||
await t.test_negative_audit_records_ddl()
|
await t.test_negative_audit_records_ddl()
|
||||||
|
|
||||||
|
|
||||||
@@ -1811,49 +1810,49 @@ async def test_audit_table_auth_multinode(manager: ManagerClient):
|
|||||||
|
|
||||||
async def test_audit_type_none_standalone(manager: ManagerClient):
|
async def test_audit_type_none_standalone(manager: ManagerClient):
|
||||||
"""audit=None — verify no auditing occurs."""
|
"""audit=None — verify no auditing occurs."""
|
||||||
await TestCQLAudit(manager).test_audit_type_none()
|
await CQLAuditTester(manager).test_audit_type_none()
|
||||||
|
|
||||||
|
|
||||||
async def test_audit_type_invalid_standalone(manager: ManagerClient):
|
async def test_audit_type_invalid_standalone(manager: ManagerClient):
|
||||||
"""audit=invalid — server should fail to start."""
|
"""audit=invalid — server should fail to start."""
|
||||||
await TestCQLAudit(manager).test_audit_type_invalid()
|
await CQLAuditTester(manager).test_audit_type_invalid()
|
||||||
|
|
||||||
|
|
||||||
async def test_composite_audit_type_invalid_standalone(manager: ManagerClient):
|
async def test_composite_audit_type_invalid_standalone(manager: ManagerClient):
|
||||||
"""audit=table,syslog,invalid — server should fail to start."""
|
"""audit=table,syslog,invalid — server should fail to start."""
|
||||||
await TestCQLAudit(manager).test_composite_audit_type_invalid()
|
await CQLAuditTester(manager).test_composite_audit_type_invalid()
|
||||||
|
|
||||||
|
|
||||||
async def test_audit_empty_settings_standalone(manager: ManagerClient):
|
async def test_audit_empty_settings_standalone(manager: ManagerClient):
|
||||||
"""audit=none — verify no auditing occurs."""
|
"""audit=none — verify no auditing occurs."""
|
||||||
await TestCQLAudit(manager).test_audit_empty_settings()
|
await CQLAuditTester(manager).test_audit_empty_settings()
|
||||||
|
|
||||||
|
|
||||||
async def test_composite_audit_empty_settings_standalone(manager: ManagerClient):
|
async def test_composite_audit_empty_settings_standalone(manager: ManagerClient):
|
||||||
"""audit=table,syslog,none — verify no auditing occurs."""
|
"""audit=table,syslog,none — verify no auditing occurs."""
|
||||||
await TestCQLAudit(manager).test_composite_audit_empty_settings()
|
await CQLAuditTester(manager).test_composite_audit_empty_settings()
|
||||||
|
|
||||||
|
|
||||||
async def test_audit_categories_invalid_standalone(manager: ManagerClient):
|
async def test_audit_categories_invalid_standalone(manager: ManagerClient):
|
||||||
"""Invalid audit_categories — server should fail to start."""
|
"""Invalid audit_categories — server should fail to start."""
|
||||||
await TestCQLAudit(manager).test_audit_categories_invalid()
|
await CQLAuditTester(manager).test_audit_categories_invalid()
|
||||||
|
|
||||||
|
|
||||||
async def test_insert_failure_standalone(manager: ManagerClient):
|
async def test_insert_failure_standalone(manager: ManagerClient):
|
||||||
"""7-node topology, audit=table, no auth — standalone due to unique topology."""
|
"""7-node topology, audit=table, no auth — standalone due to unique topology."""
|
||||||
await TestCQLAudit(manager).test_insert_failure_doesnt_report_success()
|
await CQLAuditTester(manager).test_insert_failure_doesnt_report_success()
|
||||||
|
|
||||||
|
|
||||||
async def test_service_level_statements_standalone(manager: ManagerClient):
|
async def test_service_level_statements_standalone(manager: ManagerClient):
|
||||||
"""audit=table, auth, cmdline=--smp 1 — standalone due to special cmdline."""
|
"""audit=table, auth, cmdline=--smp 1 — standalone due to special cmdline."""
|
||||||
await TestCQLAudit(manager).test_service_level_statements()
|
await CQLAuditTester(manager).test_service_level_statements()
|
||||||
|
|
||||||
|
|
||||||
# AuditBackendSyslog, no auth, rf=1
|
# AuditBackendSyslog, no auth, rf=1
|
||||||
|
|
||||||
async def test_audit_syslog_noauth(manager: ManagerClient):
|
async def test_audit_syslog_noauth(manager: ManagerClient):
|
||||||
"""Syslog backend, no auth, single node."""
|
"""Syslog backend, no auth, single node."""
|
||||||
t = TestCQLAudit(manager)
|
t = CQLAuditTester(manager)
|
||||||
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
||||||
await t.test_using_non_existent_keyspace(Syslog)
|
await t.test_using_non_existent_keyspace(Syslog)
|
||||||
await t.test_audit_keyspace(Syslog)
|
await t.test_audit_keyspace(Syslog)
|
||||||
@@ -1870,7 +1869,7 @@ async def test_audit_syslog_noauth(manager: ManagerClient):
|
|||||||
|
|
||||||
async def test_audit_syslog_auth(manager: ManagerClient):
|
async def test_audit_syslog_auth(manager: ManagerClient):
|
||||||
"""Syslog backend, auth enabled, single node."""
|
"""Syslog backend, auth enabled, single node."""
|
||||||
t = TestCQLAudit(manager)
|
t = CQLAuditTester(manager)
|
||||||
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
||||||
await t.test_user_password_masking(Syslog)
|
await t.test_user_password_masking(Syslog)
|
||||||
await t.test_role_password_masking(Syslog)
|
await t.test_role_password_masking(Syslog)
|
||||||
@@ -1881,7 +1880,7 @@ async def test_audit_syslog_auth(manager: ManagerClient):
|
|||||||
|
|
||||||
async def test_audit_composite_noauth(manager: ManagerClient):
|
async def test_audit_composite_noauth(manager: ManagerClient):
|
||||||
"""Composite backend (table+syslog), no auth, single node."""
|
"""Composite backend (table+syslog), no auth, single node."""
|
||||||
t = TestCQLAudit(manager)
|
t = CQLAuditTester(manager)
|
||||||
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
|
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
|
||||||
await t.test_using_non_existent_keyspace(Composite)
|
await t.test_using_non_existent_keyspace(Composite)
|
||||||
await t.test_audit_keyspace(Composite)
|
await t.test_audit_keyspace(Composite)
|
||||||
@@ -1898,7 +1897,7 @@ async def test_audit_composite_noauth(manager: ManagerClient):
|
|||||||
|
|
||||||
async def test_audit_composite_auth(manager: ManagerClient):
|
async def test_audit_composite_auth(manager: ManagerClient):
|
||||||
"""Composite backend (table+syslog), auth enabled, single node."""
|
"""Composite backend (table+syslog), auth enabled, single node."""
|
||||||
t = TestCQLAudit(manager)
|
t = CQLAuditTester(manager)
|
||||||
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
|
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
|
||||||
await t.test_user_password_masking(Composite)
|
await t.test_user_password_masking(Composite)
|
||||||
await t.test_role_password_masking(Composite)
|
await t.test_role_password_masking(Composite)
|
||||||
@@ -1910,29 +1909,29 @@ _composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("helper_class,config_changer", [
|
@pytest.mark.parametrize("helper_class,config_changer", [
|
||||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
|
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
|
||||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
|
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
|
||||||
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
|
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
|
||||||
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
|
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
|
||||||
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
|
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
|
||||||
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
|
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
|
||||||
])
|
])
|
||||||
async def test_config_no_liveupdate(manager: ManagerClient, helper_class, config_changer):
|
async def test_config_no_liveupdate(manager: ManagerClient, helper_class, config_changer):
|
||||||
"""Non-live audit config params (audit, audit_unix_socket_path, audit_syslog_write_buffer_size) must be unmodifiable."""
|
"""Non-live audit config params (audit, audit_unix_socket_path, audit_syslog_write_buffer_size) must be unmodifiable."""
|
||||||
await TestCQLAudit(manager).test_config_no_liveupdate(helper_class, config_changer)
|
await CQLAuditTester(manager).test_config_no_liveupdate(helper_class, config_changer)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("helper_class,config_changer", [
|
@pytest.mark.parametrize("helper_class,config_changer", [
|
||||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
|
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
|
||||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
|
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
|
||||||
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
|
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
|
||||||
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
|
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
|
||||||
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
|
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
|
||||||
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
|
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
|
||||||
])
|
])
|
||||||
async def test_config_liveupdate(manager: ManagerClient, helper_class, config_changer):
|
async def test_config_liveupdate(manager: ManagerClient, helper_class, config_changer):
|
||||||
"""Live-updatable audit config params (categories, keyspaces, tables) must be modifiable at runtime."""
|
"""Live-updatable audit config params (categories, keyspaces, tables) must be modifiable at runtime."""
|
||||||
await TestCQLAudit(manager).test_config_liveupdate(helper_class, config_changer)
|
await CQLAuditTester(manager).test_config_liveupdate(helper_class, config_changer)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("helper_class", [
|
@pytest.mark.parametrize("helper_class", [
|
||||||
@@ -1942,4 +1941,4 @@ async def test_config_liveupdate(manager: ManagerClient, helper_class, config_ch
|
|||||||
])
|
])
|
||||||
async def test_parallel_syslog_audit(manager: ManagerClient, helper_class):
|
async def test_parallel_syslog_audit(manager: ManagerClient, helper_class):
|
||||||
"""Cluster must not fail when multiple queries are audited in parallel."""
|
"""Cluster must not fail when multiple queries are audited in parallel."""
|
||||||
await TestCQLAudit(manager).test_parallel_syslog_audit(helper_class)
|
await CQLAuditTester(manager).test_parallel_syslog_audit(helper_class)
|
||||||
|
|||||||
@@ -177,7 +177,7 @@ async def _smoke_test(manager: ManagerClient, key_provider: KeyProviderFactory,
|
|||||||
# restart the cluster
|
# restart the cluster
|
||||||
if restart:
|
if restart:
|
||||||
await restart(manager, servers, cfs)
|
await restart(manager, servers, cfs)
|
||||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
cql, _ = await manager.get_ready_cql(servers)
|
||||||
else:
|
else:
|
||||||
await manager.rolling_restart(servers)
|
await manager.rolling_restart(servers)
|
||||||
for table_name in cfs:
|
for table_name in cfs:
|
||||||
|
|||||||
@@ -438,6 +438,7 @@ async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: Scylla
|
|||||||
await wait_for(all_hosts_are_alive, deadline=time.time() + 60, period=0.1)
|
await wait_for(all_hosts_are_alive, deadline=time.time() + 60, period=0.1)
|
||||||
logger.info(f"Upgrading {s.server_id}")
|
logger.info(f"Upgrading {s.server_id}")
|
||||||
await manager.server_change_version(s.server_id, scylla_binary)
|
await manager.server_change_version(s.server_id, scylla_binary)
|
||||||
|
await manager.server_sees_others(s.server_id, 2, interval=60.0)
|
||||||
|
|
||||||
logger.info("Done upgrading servers")
|
logger.info("Done upgrading servers")
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,10 @@ import asyncio
|
|||||||
import time
|
import time
|
||||||
import pytest
|
import pytest
|
||||||
import logging
|
import logging
|
||||||
|
from functools import partial
|
||||||
from test.pylib.manager_client import ManagerClient
|
from test.pylib.manager_client import ManagerClient
|
||||||
|
from test.pylib.util import wait_for
|
||||||
|
from test.pylib.internal_types import ServerInfo
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -16,6 +19,26 @@ logger = logging.getLogger(__name__)
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
async def test_crashed_node_substitution(manager: ManagerClient):
|
async def test_crashed_node_substitution(manager: ManagerClient):
|
||||||
|
"""Test that a node which crashed after starting gossip but before joining group0
|
||||||
|
(an 'orphan' node) is eventually removed from gossip by the gossiper_orphan_remover_fiber.
|
||||||
|
|
||||||
|
The scenario:
|
||||||
|
1. Start 3 nodes with the 'fast_orphan_removal_fiber' injection enabled. This freezes
|
||||||
|
the gossiper_orphan_remover_fiber on each node before it enters its polling loop,
|
||||||
|
so it cannot remove any orphan until explicitly unblocked.
|
||||||
|
2. Start a 4th node with the 'crash_before_group0_join' injection enabled. This node
|
||||||
|
starts gossip normally but blocks inside pre_server_start(), just before sending
|
||||||
|
the join RPC to the topology coordinator. It never joins group0.
|
||||||
|
3. Wait until the 4th node's gossip state has fully propagated to all 3 running peers,
|
||||||
|
then trigger its crash via the injection. At this point all peers see it as an orphan:
|
||||||
|
present in gossip but absent from the group0 topology.
|
||||||
|
4. Assert the orphan is visible in gossip (live or down) on the surviving nodes.
|
||||||
|
5. Unblock the gossiper_orphan_remover_fiber on all 3 nodes (via message_injection) and
|
||||||
|
enable the 'speedup_orphan_removal' injection so the fiber removes the orphan immediately
|
||||||
|
without waiting for the normal 60-second age threshold.
|
||||||
|
6. Wait for the 'Finished to force remove node' log line confirming removal, then assert
|
||||||
|
the orphan is no longer present in gossip.
|
||||||
|
"""
|
||||||
servers = await manager.servers_add(3, config={
|
servers = await manager.servers_add(3, config={
|
||||||
'error_injections_at_startup': ['fast_orphan_removal_fiber']
|
'error_injections_at_startup': ['fast_orphan_removal_fiber']
|
||||||
})
|
})
|
||||||
@@ -30,10 +53,24 @@ async def test_crashed_node_substitution(manager: ManagerClient):
|
|||||||
log = await manager.server_open_log(failed_server.server_id)
|
log = await manager.server_open_log(failed_server.server_id)
|
||||||
await log.wait_for("finished do_send_ack2_msg")
|
await log.wait_for("finished do_send_ack2_msg")
|
||||||
failed_id = await manager.get_host_id(failed_server.server_id)
|
failed_id = await manager.get_host_id(failed_server.server_id)
|
||||||
|
|
||||||
|
# Wait until the failed server's gossip state has propagated to all running peers.
|
||||||
|
# "finished do_send_ack2_msg" only guarantees that one peer completed a gossip round
|
||||||
|
# with the failed server; other nodes learn about it only in subsequent gossip rounds.
|
||||||
|
# Querying gossip before propagation completes would cause the assertion below to fail
|
||||||
|
# because the orphan node would not yet appear as live or down on every peer.
|
||||||
|
async def gossip_has_node(server: ServerInfo):
|
||||||
|
live = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
|
||||||
|
down = await manager.api.client.get_json("/gossiper/endpoint/down", host=server.ip_addr)
|
||||||
|
return True if failed_server.ip_addr in live + down else None
|
||||||
|
|
||||||
|
for s in servers:
|
||||||
|
await wait_for(partial(gossip_has_node, s), deadline=time.time() + 30)
|
||||||
|
|
||||||
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
|
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
|
||||||
|
|
||||||
await task
|
await task
|
||||||
|
|
||||||
live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)
|
live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)
|
||||||
down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr)
|
down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr)
|
||||||
|
|
||||||
|
|||||||
@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
|
|||||||
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
|
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
|
||||||
from test.pylib.tablets import get_tablet_replicas
|
from test.pylib.tablets import get_tablet_replicas
|
||||||
from test.pylib.scylla_cluster import ReplaceConfig
|
from test.pylib.scylla_cluster import ReplaceConfig
|
||||||
from test.pylib.util import wait_for
|
from test.pylib.util import gather_safely, wait_for
|
||||||
|
|
||||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
|
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -51,28 +51,42 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
|
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
|
||||||
node_count = 2
|
node_count = 2
|
||||||
servers = await manager.servers_add(node_count)
|
cmdline = ["--logger-log-level", "hints_manager=trace"]
|
||||||
|
servers = await manager.servers_add(node_count, cmdline=cmdline)
|
||||||
|
|
||||||
|
async def wait_for_hints_written(min_hint_count: int, timeout: int):
|
||||||
|
async def aux():
|
||||||
|
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||||
|
if hints_written >= min_hint_count:
|
||||||
|
return True
|
||||||
|
return None
|
||||||
|
assert await wait_for(aux, time.time() + timeout)
|
||||||
|
|
||||||
cql = manager.get_cql()
|
cql = manager.get_cql()
|
||||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||||
table = f"{ks}.t"
|
uses_tablets = await keyspace_has_tablets(manager, ks)
|
||||||
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
|
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
|
||||||
|
# Otherwise, it could happen that all mutations would target servers[0] only, which would
|
||||||
|
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
|
||||||
|
# distributed more or less uniformly!
|
||||||
|
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
|
||||||
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
|
||||||
|
await manager.server_stop_gracefully(servers[1].server_id)
|
||||||
|
|
||||||
await manager.server_stop_gracefully(servers[1].server_id)
|
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||||
|
|
||||||
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
|
||||||
|
stmt.consistency_level = ConsistencyLevel.ANY
|
||||||
|
|
||||||
# Some of the inserts will be targeted to the dead node.
|
# Some of the inserts will be targeted to the dead node.
|
||||||
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
||||||
for i in range(100):
|
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
|
||||||
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
|
|
||||||
|
|
||||||
# Verify hints are written
|
# Verify hints are written
|
||||||
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
await wait_for_hints_written(hints_before + 1, timeout=60)
|
||||||
assert hints_after > hints_before
|
|
||||||
|
|
||||||
# For dropping the keyspace
|
# For dropping the keyspace
|
||||||
await manager.server_start(servers[1].server_id)
|
await manager.server_start(servers[1].server_id)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_limited_concurrency_of_writes(manager: ManagerClient):
|
async def test_limited_concurrency_of_writes(manager: ManagerClient):
|
||||||
|
|||||||
@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
|
|||||||
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
||||||
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
|
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
|
||||||
|
|
||||||
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
|
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
|
||||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
|
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
|
||||||
repaired_keys = set(range(0, nr_keys))
|
repaired_keys = set(range(0, nr_keys))
|
||||||
unrepaired_keys = set()
|
unrepaired_keys = set()
|
||||||
@@ -164,7 +164,7 @@ async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
|
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
|
||||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||||
|
|
||||||
await insert_keys(cql, ks, 0, 100)
|
await insert_keys(cql, ks, 0, 100)
|
||||||
|
|
||||||
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
|
|||||||
|
|
||||||
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
|
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
|
||||||
nr_keys = 100
|
nr_keys = 100
|
||||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
|
||||||
token = -1
|
token = -1
|
||||||
|
|
||||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||||
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
||||||
nr_keys = 100
|
nr_keys = 100
|
||||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||||
|
|
||||||
# Disable autocompaction
|
# Disable autocompaction
|
||||||
for server in servers:
|
for server in servers:
|
||||||
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
|||||||
|
|
||||||
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
|
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
|
||||||
nr_keys = 100
|
nr_keys = 100
|
||||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||||
|
|
||||||
# First repair
|
# First repair
|
||||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||||
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
|
|||||||
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
|
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
|
||||||
nr_keys = 100
|
nr_keys = 100
|
||||||
cmdline = ["--hinted-handoff-enabled", "0"]
|
cmdline = ["--hinted-handoff-enabled", "0"]
|
||||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||||
|
|
||||||
await manager.server_stop_gracefully(servers[1].server_id)
|
await manager.server_stop_gracefully(servers[1].server_id)
|
||||||
|
|
||||||
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
|
|||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
|
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
|
||||||
nr_keys = 100
|
nr_keys = 100
|
||||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||||
|
|
||||||
# First repair
|
# First repair
|
||||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||||
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
|
|||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
|
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
|
||||||
nr_keys = 100
|
nr_keys = 100
|
||||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||||
|
|
||||||
# First repair
|
# First repair
|
||||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||||
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
|
|||||||
nr_keys = 100
|
nr_keys = 100
|
||||||
# Make sure no data commit log replay after force server stop
|
# Make sure no data commit log replay after force server stop
|
||||||
cmdline = ['--enable-commitlog', '0']
|
cmdline = ['--enable-commitlog', '0']
|
||||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||||
|
|
||||||
# First repair
|
# First repair
|
||||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||||
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||||
token = -1
|
token = -1
|
||||||
|
|
||||||
sstables_repaired_at = 0
|
sstables_repaired_at = 0
|
||||||
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
|
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
|
||||||
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||||
time1 = 0
|
time1 = 0
|
||||||
time2 = 0
|
time2 = 0
|
||||||
|
|
||||||
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
|
|||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
|
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
|
||||||
cmdline = ['--logger-log-level', 'repair=debug']
|
cmdline = ['--logger-log-level', 'repair=debug']
|
||||||
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
||||||
|
|
||||||
coord = await get_topology_coordinator(manager)
|
coord = await get_topology_coordinator(manager)
|
||||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ from cassandra.query import SimpleStatement
|
|||||||
from test.pylib.async_cql import _wrap_future
|
from test.pylib.async_cql import _wrap_future
|
||||||
from test.pylib.manager_client import ManagerClient
|
from test.pylib.manager_client import ManagerClient
|
||||||
from test.pylib.random_tables import RandomTables, TextType, Column
|
from test.pylib.random_tables import RandomTables, TextType, Column
|
||||||
|
from test.pylib.rest_client import read_barrier
|
||||||
from test.pylib.util import unique_name
|
from test.pylib.util import unique_name
|
||||||
from test.cluster.conftest import cluster_con
|
from test.cluster.conftest import cluster_con
|
||||||
|
|
||||||
@@ -403,6 +404,7 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
|
|||||||
for task in [*valid_keyspaces, *invalid_keyspaces]:
|
for task in [*valid_keyspaces, *invalid_keyspaces]:
|
||||||
_ = tg.create_task(task)
|
_ = tg.create_task(task)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
|
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
|
||||||
"""
|
"""
|
||||||
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
|
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
|
||||||
@@ -464,22 +466,50 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
|
|||||||
for rfs, tablets in valid_keyspaces:
|
for rfs, tablets in valid_keyspaces:
|
||||||
_ = tg.create_task(create_keyspace(rfs, tablets))
|
_ = tg.create_task(create_keyspace(rfs, tablets))
|
||||||
|
|
||||||
await manager.server_stop_gracefully(s1.server_id)
|
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
|
||||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
|
||||||
|
|
||||||
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
|
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
|
||||||
|
running_servers = await manager.running_servers()
|
||||||
|
should_start = s1.server_id not in [server.server_id for server in running_servers]
|
||||||
|
if should_start:
|
||||||
|
await manager.server_start(s1.server_id)
|
||||||
|
|
||||||
ks = await create_keyspace(rfs, True)
|
ks = await create_keyspace(rfs, True)
|
||||||
|
# We need to wait for the new schema to propagate.
|
||||||
|
# Otherwise, it's not clear when the mutation
|
||||||
|
# corresponding to the created keyspace will
|
||||||
|
# arrive at server 1.
|
||||||
|
# It could happen only after the node performs
|
||||||
|
# the check upon start-up, effectively leading
|
||||||
|
# to a successful start-up, which we don't want.
|
||||||
|
# For more context, see issue: SCYLLADB-1137.
|
||||||
|
await read_barrier(manager.api, s1.ip_addr)
|
||||||
|
|
||||||
|
await manager.server_stop_gracefully(s1.server_id)
|
||||||
|
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||||
|
|
||||||
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
|
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
|
||||||
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
|
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
|
||||||
_ = await manager.server_start(s1.server_id, expected_error=err)
|
await manager.server_start(s1.server_id, expected_error=err)
|
||||||
await cql.run_async(f"DROP KEYSPACE {ks}")
|
await cql.run_async(f"DROP KEYSPACE {ks}")
|
||||||
|
|
||||||
|
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
|
||||||
|
|
||||||
# Test RF-rack-invalid keyspaces.
|
# Test RF-rack-invalid keyspaces.
|
||||||
await try_fail([2, 0], "dc1", 2, 3)
|
await try_fail([2, 0], "dc1", 2, 3)
|
||||||
await try_fail([3, 2], "dc2", 2, 1)
|
await try_fail([3, 2], "dc2", 2, 1)
|
||||||
await try_fail([4, 1], "dc1", 4, 3)
|
await try_fail([4, 1], "dc1", 4, 3)
|
||||||
|
|
||||||
_ = await manager.server_start(s1.server_id)
|
# We need to perform a read barrier on the node to make
|
||||||
|
# sure that it processes the last DROP KEYSPACE.
|
||||||
|
# Otherwise, the node could think the RF-rack-invalid
|
||||||
|
# keyspace still exists.
|
||||||
|
await manager.server_start(s1.server_id)
|
||||||
|
await read_barrier(manager.api, s1.ip_addr)
|
||||||
|
await manager.server_stop_gracefully(s1.server_id)
|
||||||
|
|
||||||
|
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||||
|
await manager.server_start(s1.server_id)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):
|
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):
|
||||||
|
|||||||
@@ -23,10 +23,25 @@ from test.cluster.object_store.conftest import format_tuples
|
|||||||
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
|
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
|
||||||
from test.cluster.util import new_test_keyspace
|
from test.cluster.util import new_test_keyspace
|
||||||
from test.pylib.rest_client import read_barrier
|
from test.pylib.rest_client import read_barrier
|
||||||
from test.pylib.util import unique_name
|
from test.pylib.util import unique_name, wait_for
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
|
||||||
|
'''
|
||||||
|
Wait until the upload directory is empty with a timeout.
|
||||||
|
SSTable unlinking is asynchronous and in rare situations, it can happen
|
||||||
|
that not all sstables are deleted from the upload dir immediately after refresh is done.
|
||||||
|
'''
|
||||||
|
deadline = time.time() + timeout
|
||||||
|
async def check_empty():
|
||||||
|
files = os.listdir(upload_dir)
|
||||||
|
if not files:
|
||||||
|
return True
|
||||||
|
return None
|
||||||
|
await wait_for(check_empty, deadline, period=0.5)
|
||||||
|
|
||||||
class SSTablesOnLocalStorage:
|
class SSTablesOnLocalStorage:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
||||||
@@ -153,7 +168,8 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
|||||||
|
|
||||||
for s in servers:
|
for s in servers:
|
||||||
cf_dir = dirs[s.server_id]["cf_dir"]
|
cf_dir = dirs[s.server_id]["cf_dir"]
|
||||||
files = os.listdir(os.path.join(cf_dir, 'upload'))
|
upload_dir = os.path.join(cf_dir, 'upload')
|
||||||
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
|
assert os.path.exists(upload_dir)
|
||||||
|
await wait_for_upload_dir_empty(upload_dir)
|
||||||
|
|
||||||
shutil.rmtree(tmpbackup)
|
shutil.rmtree(tmpbackup)
|
||||||
|
|||||||
@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
|
|||||||
tombstone_mark = datetime.now(timezone.utc)
|
tombstone_mark = datetime.now(timezone.utc)
|
||||||
|
|
||||||
# test #2: the tombstones are not cleaned up when one node is down
|
# test #2: the tombstones are not cleaned up when one node is down
|
||||||
with pytest.raises(AssertionError, match="Deadline exceeded"):
|
with pytest.raises(AssertionError, match="timed out"):
|
||||||
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
|
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
|
||||||
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
||||||
|
|
||||||
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
|
|||||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||||
|
|
||||||
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
|
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
|
||||||
with pytest.raises(AssertionError, match="Deadline exceeded"):
|
with pytest.raises(AssertionError, match="timed out"):
|
||||||
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
||||||
|
|
||||||
await manager.remove_node(servers[0].server_id, down_server.server_id)
|
await manager.remove_node(servers[0].server_id, down_server.server_id)
|
||||||
|
|||||||
@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
|
|||||||
unpublished_generations = topo_res[0].unpublished_cdc_generations
|
unpublished_generations = topo_res[0].unpublished_cdc_generations
|
||||||
return unpublished_generations is None or len(unpublished_generations) == 0 or None
|
return unpublished_generations is None or len(unpublished_generations) == 0 or None
|
||||||
|
|
||||||
await wait_for(all_generations_published, deadline=deadline, period=1.0)
|
await wait_for(all_generations_published, deadline=deadline)
|
||||||
|
|
||||||
|
|
||||||
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
|
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
|
||||||
@@ -470,6 +470,17 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
|
|||||||
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
|
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
|
||||||
|
|
||||||
|
|
||||||
|
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
|
||||||
|
"""
|
||||||
|
Checks whether the given keyspace uses tablets.
|
||||||
|
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
|
||||||
|
"""
|
||||||
|
cql = manager.get_cql()
|
||||||
|
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
|
||||||
|
rows = list(rows_iter)
|
||||||
|
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
|
||||||
|
|
||||||
|
|
||||||
async def get_raft_log_size(cql, host) -> int:
|
async def get_raft_log_size(cql, host) -> int:
|
||||||
query = "select count(\"index\") from system.raft"
|
query = "select count(\"index\") from system.raft"
|
||||||
return (await cql.run_async(query, host=host))[0][0]
|
return (await cql.run_async(query, host=host))[0][0]
|
||||||
|
|||||||
@@ -271,10 +271,21 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
|
|||||||
// arbitrary timeout of 120s for the server to make some output. Very generous.
|
// arbitrary timeout of 120s for the server to make some output. Very generous.
|
||||||
// but since we (maybe) run docker, and might need to pull image, this can take
|
// but since we (maybe) run docker, and might need to pull image, this can take
|
||||||
// some time if we're unlucky.
|
// some time if we're unlucky.
|
||||||
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
|
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
|
||||||
} catch (in_use&) {
|
for (auto* f : {&f1, &f2}) {
|
||||||
retry = true;
|
if (f->failed()) {
|
||||||
p = std::current_exception();
|
try {
|
||||||
|
f->get();
|
||||||
|
} catch (in_use&) {
|
||||||
|
retry = true;
|
||||||
|
p = std::current_exception();
|
||||||
|
} catch (...) {
|
||||||
|
if (!p) {
|
||||||
|
p = std::current_exception();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
p = std::current_exception();
|
p = std::current_exception();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,15 +56,25 @@ def unique_name(unique_name_prefix = 'test_'):
|
|||||||
async def wait_for(
|
async def wait_for(
|
||||||
pred: Callable[[], Awaitable[Optional[T]]],
|
pred: Callable[[], Awaitable[Optional[T]]],
|
||||||
deadline: float,
|
deadline: float,
|
||||||
period: float = 1,
|
period: float = 0.1,
|
||||||
before_retry: Optional[Callable[[], Any]] = None,
|
before_retry: Optional[Callable[[], Any]] = None,
|
||||||
backoff_factor: float = 1,
|
backoff_factor: float = 1.5,
|
||||||
max_period: float = None) -> T:
|
max_period: float = 1.0,
|
||||||
|
label: Optional[str] = None) -> T:
|
||||||
|
tag = label or getattr(pred, '__name__', 'unlabeled')
|
||||||
|
start = time.time()
|
||||||
|
retries = 0
|
||||||
while True:
|
while True:
|
||||||
assert(time.time() < deadline), "Deadline exceeded, failing test."
|
elapsed = time.time() - start
|
||||||
|
assert time.time() < deadline, \
|
||||||
|
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
|
||||||
res = await pred()
|
res = await pred()
|
||||||
if res is not None:
|
if res is not None:
|
||||||
|
if retries > 0:
|
||||||
|
logger.debug(f"wait_for({tag}) completed "
|
||||||
|
f"in {elapsed:.2f}s ({retries} retries)")
|
||||||
return res
|
return res
|
||||||
|
retries += 1
|
||||||
await asyncio.sleep(period)
|
await asyncio.sleep(period)
|
||||||
period *= backoff_factor
|
period *= backoff_factor
|
||||||
if max_period is not None:
|
if max_period is not None:
|
||||||
@@ -273,14 +283,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
|
|||||||
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
||||||
return done[0][0] == node_count or None
|
return done[0][0] == node_count or None
|
||||||
deadline = time.time() + timeout
|
deadline = time.time() + timeout
|
||||||
await wait_for(view_is_built, deadline)
|
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
|
||||||
|
|
||||||
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
|
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
|
||||||
async def view_is_built():
|
async def view_is_built():
|
||||||
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
||||||
return done[0][0] == node_count or None
|
return done[0][0] == node_count or None
|
||||||
deadline = time.time() + timeout
|
deadline = time.time() + timeout
|
||||||
await wait_for(view_is_built, deadline)
|
await wait_for(view_is_built, deadline, label=f"view_{name}")
|
||||||
|
|
||||||
|
|
||||||
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):
|
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):
|
||||||
|
|||||||
@@ -69,6 +69,7 @@
|
|||||||
#include "message/messaging_service.hh"
|
#include "message/messaging_service.hh"
|
||||||
#include "idl/forward_cql.dist.hh"
|
#include "idl/forward_cql.dist.hh"
|
||||||
#include "utils/bit_cast.hh"
|
#include "utils/bit_cast.hh"
|
||||||
|
#include "utils/error_injection.hh"
|
||||||
#include "utils/labels.hh"
|
#include "utils/labels.hh"
|
||||||
#include "utils/result.hh"
|
#include "utils/result.hh"
|
||||||
#include "utils/reusable_buffer.hh"
|
#include "utils/reusable_buffer.hh"
|
||||||
@@ -1633,13 +1634,26 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
|
|||||||
}
|
}
|
||||||
|
|
||||||
tracing::trace(trace_state, "Processing a statement");
|
tracing::trace(trace_state, "Processing a statement");
|
||||||
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
|
auto cache_key_for_metadata = cache_key;
|
||||||
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
|
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, prepared, std::move(cache_key), needs_authorization)
|
||||||
|
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id), &qp, cache_key = std::move(cache_key_for_metadata), prepared = std::move(prepared)] (auto msg) mutable {
|
||||||
if (msg->move_to_shard()) {
|
if (msg->move_to_shard()) {
|
||||||
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
|
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
|
||||||
} else if (msg->is_exception()) {
|
} else if (msg->is_exception()) {
|
||||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
||||||
} else {
|
} else {
|
||||||
|
if (prepared->result_metadata_is_empty()
|
||||||
|
&& metadata_id.has_request_metadata_id()
|
||||||
|
&& !utils::get_local_injector().enter("skip_prepared_result_metadata_promotion")) {
|
||||||
|
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
|
||||||
|
auto rows_metadata_id = rows->rs().get_metadata().calculate_metadata_id();
|
||||||
|
clogger.debug("prepared result metadata promotion: request_metadata_id_present={}, calculated_rows_metadata_id_size={}",
|
||||||
|
metadata_id.has_request_metadata_id(), rows_metadata_id._metadata_id.size());
|
||||||
|
qp.local().update_prepared_result_metadata_id(cache_key, rows_metadata_id);
|
||||||
|
auto request_metadata_id = metadata_id.get_request_metadata_id();
|
||||||
|
metadata_id = cql_metadata_id_wrapper(std::move(request_metadata_id), std::move(rows_metadata_id));
|
||||||
|
}
|
||||||
|
}
|
||||||
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
||||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
|
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
|
||||||
}
|
}
|
||||||
@@ -2507,9 +2521,16 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
|
|||||||
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
|
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
|
||||||
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
|
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
|
||||||
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
|
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
|
||||||
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
|
||||||
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
|
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
|
||||||
no_metadata = false;
|
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
|
||||||
|
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
|
||||||
|
no_metadata, skip_rows_metadata_changed_response);
|
||||||
|
if (!skip_rows_metadata_changed_response) {
|
||||||
|
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
||||||
|
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
|
||||||
|
no_metadata = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user