Compare commits

..

2 Commits

Author SHA1 Message Date
Alex
3ce2a2a479 test/cluster: add cluster test for prepared metadata_id promotion
Add a regression test that verifies the server correctly promotes the
prepared metadata_id for statements whose PREPARE response carries empty
result metadata (NO_METADATA), such as LIST ROLES OF.
The standard Python driver does not negotiate SCYLLA_USE_METADATA_ID and
cannot exercise this path, so the test uses a minimal raw CQL v5 socket
implementation that negotiates the extension directly in the STARTUP
options and includes result_metadata_id in EXECUTE frames.

Two test cases are added:
- test_list_roles_of_prepared_metadata_promotion: happy path — verifies
  that an EXECUTE carrying a stale empty metadata_id receives a
  METADATA_CHANGED response with the real metadata_id.
- test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection:
  negative path — activates the skip_rows_metadata_changed_response
  error injection point and verifies that the promotion is suppressed,
  confirming the happy-path test is not a false positive.

To support the injection-based negative test, two error injection points
are added to transport/server.cc:
- skip_prepared_result_metadata_promotion: bypasses the promotion logic
  in process_execute_internal so the cached prepared entry is not updated.
- skip_rows_metadata_changed_response: suppresses the METADATA_CHANGED
  flag and NO_METADATA clearance in response::write so the client sees
  the stale NO_METADATA response as if the fix were absent.

Tests: test/cluster/auth_cluster/test_prepared_metadata_promotion.py (dev/dbg)
2026-03-30 10:48:17 +03:00
Alex
2cdd178379 transport/server: Promote prepared metadata_id after first rows response
Some prepared statements do not know their result metadata at PREPARE
  time and therefore return the metadata_id of empty metadata. When such a
  statement later produces a ROWS response with real metadata, comparing the
  client-supplied metadata_id against the prepared response metadata_id is
  incorrect: the server keeps NO_METADATA even though the actual rows metadata
  differs.

  Scylla already has the actual rows metadata when EXECUTE returns a ROWS
  result. Use that first execution to promote the cached prepared statement to
  the normal metadata_id path.

  This change keeps the existing behavior for statements whose PREPARED
  response already carries real result metadata. For prepared statements whose
  PREPARED response had empty result metadata, the first EXECUTE with
  metadata_id support now:

  - calculates the metadata_id from the actual rows metadata
  - updates the cached prepared entry with that metadata_id
  - marks the prepared entry as having non-empty result metadata
  - uses the promoted metadata_id for the current response

  After that promotion, subsequent EXECUTEs use the existing fast path and do
  not need to recalculate the metadata_id again.

  The prepared statement remains read-only through public checked weak
  handles. The mutation is performed only through the prepared statements
  cache/query_processor layer, which owns the mutable cached entry.

  Testing:

  - add a regression test verifying that a ROWS response built from a stale
    empty-metadata id returns METADATA_CHANGED and the actual rows
    metadata_id
2026-03-29 15:52:31 +03:00
38 changed files with 627 additions and 475 deletions

View File

@@ -699,17 +699,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// for such a size.
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
}
// Check the concurrency limit early, before acquiring memory and
// reading the request body, to avoid piling up memory from excess
// requests that will be rejected anyway. This mirrors the CQL
// transport which also checks concurrency before memory acquisition
// (transport/server.cc).
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
// JSON parsing can allocate up to roughly 2x the size of the raw
// document, + a couple of bytes for maintenance.
// If the Content-Length of the request is not available, we assume
@@ -771,6 +760,12 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
_executor._stats.unsupported_operations++;
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
}
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
executor::client_state client_state(service::client_state::external_tag(),
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
if (!username.empty()) {

View File

@@ -143,6 +143,15 @@ public:
return value_type();
}
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
cache_value_ptr vp = _cache.find(key.key());
if (!vp) {
return false;
}
(*vp)->update_result_metadata_id(std::move(metadata_id));
return true;
}
template <typename Pred>
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
void remove_if(Pred&& pred) {

View File

@@ -260,6 +260,10 @@ public:
return _prepared_cache.find(key);
}
bool update_prepared_result_metadata_id(const prepared_cache_key_type& key, cql_metadata_id_type metadata_id) {
return _prepared_cache.update_result_metadata_id(key, std::move(metadata_id));
}
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared(

View File

@@ -52,6 +52,7 @@ public:
std::vector<sstring> warnings;
private:
cql_metadata_id_type _metadata_id;
bool _result_metadata_is_empty;
public:
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
@@ -71,6 +72,15 @@ public:
void calculate_metadata_id();
cql_metadata_id_type get_metadata_id() const;
bool result_metadata_is_empty() const {
return _result_metadata_is_empty;
}
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
_metadata_id = std::move(metadata_id);
_result_metadata_is_empty = false;
}
};
}

View File

@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
, partition_key_bind_indices(std::move(partition_key_bind_indices))
, warnings(std::move(warnings))
, _metadata_id(bytes{})
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
{
statement->set_audit_info(std::move(audit_info));
}

View File

@@ -9,7 +9,6 @@
import os
import sys
import shlex
import argparse
import psutil
from pathlib import Path
@@ -104,41 +103,16 @@ if __name__ == '__main__':
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
swapfile.chmod(0o600)
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
mount_point = find_mount_point(swap_directory)
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
# Add DefaultDependencies=no to the swap unit to avoid getting the default
# Before=swap.target dependency. We apply this to all clouds, but the
# requirement came from Azure:
#
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
# the network (After=network-online.target). By extension, this means that
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
# then the swap unit would be part of the swap.target which other services
# assume to be a local boot target, so we would end up with dependency cycles
# such as:
#
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
#
# By removing the automatic Before=swap.target, the swap unit is no longer
# part of swap.target, avoiding such cycles. The swap will still be
# activated via WantedBy=multi-user.target.
unit_data = '''
[Unit]
Description=swapfile
DefaultDependencies=no
After={}
Conflicts=umount.target
Before=umount.target
[Swap]
What={}
[Install]
WantedBy=multi-user.target
'''[1:-1].format(mount_unit, swapfile)
'''[1:-1].format(swapfile)
with swapunit.open('w') as f:
f.write(unit_data)
systemd_unit.reload()

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:54662978b9ce4a6e25790b1b0a5099e6063173ffa95a399a6287cf474376ed09
size 6595952
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
size 6505524

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:0cf44ea1fb2ae20de45d26fe8095054e60cb8700cddcb2fd79ef79705484b18a
size 6603780
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
size 6598648

View File

@@ -1098,8 +1098,7 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
// Make sure that only a leader or a node that is part of the config can request read barrier
// Nodes outside of the config may never get the data, so they will not be able to read it.
follower_progress* opt_progress = leader_state().tracker.find(requester);
if (requester != _my_id && opt_progress == nullptr) {
if (requester != _my_id && leader_state().tracker.find(requester) == nullptr) {
throw std::runtime_error(fmt::format("Read barrier requested by a node outside of the configuration {}", requester));
}
@@ -1110,23 +1109,19 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
return {};
}
// Optimization for read barriers requested on non-voters. A non-voter doesn't receive the read_quorum message, so
// it might update its commit index only after another leader tick, which would slow down wait_for_apply() at the
// end of the read barrier. Prevent that by replicating to the non-voting requester here.
if (requester != _my_id && opt_progress->commit_idx < _commit_idx && opt_progress->match_idx == _log.last_idx()
&& !opt_progress->can_vote) {
logger.trace("start_read_barrier[{}]: replicate to {} because follower commit_idx={} < commit_idx={}, "
"follower match_idx={} == last_idx={}, and follower can_vote={}",
_my_id, requester, opt_progress->commit_idx, _commit_idx, opt_progress->match_idx,
_log.last_idx(), opt_progress->can_vote);
replicate_to(*opt_progress, true);
}
read_id id = next_read_id();
logger.trace("start_read_barrier[{}] starting read barrier with id {}", _my_id, id);
return std::make_pair(id, _commit_idx);
}
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
// read_idx from the leader might not be replicated to the local node yet.
const bool in_local_log = read_idx <= _log.last_idx();
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
advance_commit_idx(read_idx);
}
}
void fsm::stop() {
if (is_leader()) {
// Become follower to stop accepting requests

View File

@@ -480,6 +480,15 @@ public:
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
// read index belongs to the current term.
//
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
// Completeness Property). Hence, updating the commit index is safe.
void maybe_update_commit_idx_for_read(index_t read_idx);
size_t in_memory_log_size() const {
return _log.in_memory_size();
}

View File

@@ -1571,6 +1571,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
co_return stop_iteration::no;
}
read_idx = std::get<index_t>(res);
_fsm->maybe_update_commit_idx_for_read(read_idx);
co_return stop_iteration::yes;
});

View File

@@ -538,7 +538,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
group0_id = g0_info.group0_id;
raft::server_address my_addr{my_id, {}};
bool starting_server_as_follower = false;
if (server == nullptr) {
// This is the first time discovery is run. Create and start a Raft server for group 0 on this node.
raft::configuration initial_configuration;
@@ -566,7 +565,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
// trigger an empty snapshot transfer.
nontrivial_snapshot = true;
} else {
starting_server_as_follower = true;
co_await handshaker->pre_server_start(g0_info);
}
@@ -593,9 +591,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
}
SCYLLA_ASSERT(server);
co_await utils::get_local_injector().inject("join_group0_pause_before_config_check",
utils::wait_for_message(std::chrono::minutes{5}));
if (!starting_server_as_follower && server->get_configuration().contains(my_id)) {
if (server->get_configuration().contains(my_id)) {
// True if we started a new group or completed a configuration change initiated earlier.
group0_log.info("server {} already in group 0 (id {}) as {}", my_id, group0_id,
server->get_configuration().can_vote(my_id)? "voter" : "non-voter");

View File

@@ -239,9 +239,11 @@ public:
// The i-th element corresponds to the i-th entry in _entries.
// Can be smaller than _entries. If _entries[i] doesn't have a matching element in _promoted_indexes then
// that entry doesn't have a promoted index.
// It's not chunked, because promoted index is present only when there are large partitions in the page,
// which also means the page will have typically only 1 entry due to summary:data_file size ratio.
// Kept separately to avoid paying for storage cost in pages where no entry has a promoted index,
// which is typical in workloads with small partitions.
lsa::chunked_managed_vector<promoted_index> _promoted_indexes;
managed_vector<promoted_index> _promoted_indexes;
public:
partition_index_page() = default;
partition_index_page(partition_index_page&&) noexcept = default;

73
test.py
View File

@@ -11,7 +11,6 @@ from __future__ import annotations
import argparse
import asyncio
import math
import shlex
import textwrap
from random import randint
@@ -74,51 +73,6 @@ PYTEST_RUNNER_DIRECTORIES = [
launch_time = time.monotonic()
class ThreadsCalculator:
"""
The ThreadsCalculator class calculates the number of jobs that can be run concurrently based on system
memory and CPU constraints. It allows resource reservation and configurable parameters for
flexible job scheduling in various modes, such as `debug`.
"""
def __init__(self,
modes: list[str],
min_system_memory_reserve: float = 5e9,
max_system_memory_reserve: float = 8e9,
system_memory_reserve_fraction = 16,
max_test_memory: float = 5e9,
test_memory_fraction: float = 8.0,
debug_test_memory_multiplier: float = 1.5,
debug_cpus_per_test_job=1.5,
non_debug_cpus_per_test_job: float =1.0,
non_debug_max_test_memory: float = 4e9
):
sys_mem = int(os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES"))
test_mem = min(sys_mem / test_memory_fraction, max_test_memory)
if "debug" in modes:
test_mem *= debug_test_memory_multiplier
system_memory_reserve = int(min(
max(sys_mem / system_memory_reserve_fraction, min_system_memory_reserve),
max_system_memory_reserve,
))
available_mem = max(0, sys_mem - system_memory_reserve)
is_debug = "debug" in modes
test_mem = min(
sys_mem / test_memory_fraction,
max_test_memory if is_debug else non_debug_max_test_memory,
)
if is_debug:
test_mem *= debug_test_memory_multiplier
self.cpus_per_test_job = (
debug_cpus_per_test_job if is_debug else non_debug_cpus_per_test_job
)
self.default_num_jobs_mem = max(1, int(available_mem // test_mem))
def get_number_of_threads(self, nr_cpus: int) -> int:
default_num_jobs_cpu = max(1, math.ceil(nr_cpus / self.cpus_per_test_job))
return min(self.default_num_jobs_mem, default_num_jobs_cpu)
class TabularConsoleOutput:
"""Print test progress to the console"""
@@ -319,13 +273,6 @@ def parse_cmd_line() -> argparse.Namespace:
if args.skip_patterns and args.k:
parser.error(palette.fail('arguments --skip and -k are mutually exclusive, please use only one of them'))
if not args.modes:
try:
args.modes = get_configured_modes()
except Exception:
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
raise
if not args.jobs:
if not args.cpus:
nr_cpus = multiprocessing.cpu_count()
@@ -333,7 +280,19 @@ def parse_cmd_line() -> argparse.Namespace:
nr_cpus = int(subprocess.check_output(
['taskset', '-c', args.cpus, 'python3', '-c',
'import os; print(len(os.sched_getaffinity(0)))']))
args.jobs = ThreadsCalculator(args.modes).get_number_of_threads(nr_cpus)
cpus_per_test_job = 1
sysmem = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
testmem = 6e9 if os.sysconf('SC_PAGE_SIZE') > 4096 else 2e9
default_num_jobs_mem = ((sysmem - 4e9) // testmem)
args.jobs = min(default_num_jobs_mem, nr_cpus // cpus_per_test_job)
if not args.modes:
try:
args.modes = get_configured_modes()
except Exception:
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
raise
if not args.coverage_modes and args.coverage:
args.coverage_modes = list(args.modes)
@@ -391,12 +350,16 @@ def run_pytest(options: argparse.Namespace) -> tuple[int, list[SimpleNamespace]]
if options.list_tests:
args.extend(['--collect-only', '--quiet', '--no-header'])
else:
threads = int(options.jobs)
# debug mode is very CPU and memory hungry, so we need to lower the number of threads to be able to finish tests
if 'debug' in options.modes:
threads = int(threads * 0.5)
args.extend([
"--log-level=DEBUG", # Capture logs
f'--junit-xml={junit_output_file}',
"-rf",
'--test-py-init',
f'-n{options.jobs}',
f'-n{threads}',
f'--tmpdir={temp_dir}',
f'--maxfail={options.max_failures}',
f'--alluredir={report_dir / f"allure_{HOST_ID}"}',

View File

@@ -18,7 +18,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/manual_clock.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/later.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/util/alloc_failure_injector.hh>
@@ -290,17 +290,12 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
m.set_expiring(id1);
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
m.barrier().get();
promise<> shard0_timer_expired;
timer<manual_clock> shard0_timer([&shard0_timer_expired] {
shard0_timer_expired.set_value();
});
shard0_timer.arm(manual_clock::now() + expiration_time);
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
manual_clock::advance(expiration_time);
BOOST_CHECK(!m.find(id1));
return smp::submit_to(0, []{}); // Ensure shard 0 notices timer is expired.
}).get();
shard0_timer_expired.get_future().get();
BOOST_CHECK(!m.find(id1));
// Expiring entries are replicated

View File

@@ -1045,6 +1045,7 @@ validate_result_size(size_t i, schema_ptr schema, const utils::chunked_vector<mu
struct fuzzy_test_config {
uint32_t seed;
std::chrono::seconds timeout;
unsigned concurrency;
unsigned scans;
};
@@ -1076,9 +1077,6 @@ run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, sharded<replica::database>&
testlog.debug("[scan#{}]: seed={}, is_stateful={}, prange={}, ckranges={}", i, seed, is_stateful, partition_range,
partition_slice.default_row_ranges());
// Use a small max_size to force many pages per scan, stressing the
// paging and result-merging logic. With the large row limit here,
// the byte limit is typically the tighter bound.
const auto [results, npages] = read_partitions_with_paged_scan(db, schema, 1000, 1024, is_stateful, partition_range, partition_slice);
const auto expected_partitions = slice_partitions(*schema, mutations, partition_index_range, partition_slice);
@@ -1162,27 +1160,21 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
std::uniform_int_distribution<size_t>(0, 100), // range-tombstones
#else
// Keep these values moderate: with complex randomly-generated
// schemas (deeply nested frozen collections/UDTs), large row
// counts cause data generation and paged scanning to be very
// slow, leading to CI timeouts. The test's value comes from
// schema variety and paging correctness, not from sheer data
// volume.
std::uniform_int_distribution<size_t>(32, 64), // partitions
std::uniform_int_distribution<size_t>(0, 200), // clustering-rows
std::uniform_int_distribution<size_t>(0, 200), // range-tombstones
std::uniform_int_distribution<size_t>(0, 1000), // clustering-rows
std::uniform_int_distribution<size_t>(0, 1000), // range-tombstones
#endif
tests::default_timestamp_generator());
#if defined DEBUG
auto cfg = fuzzy_test_config{seed, 1, 1};
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{8}, 1, 1};
#elif defined DEVEL
auto cfg = fuzzy_test_config{seed, 2, 4};
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 2, 4};
#else
auto cfg = fuzzy_test_config{seed, 4, 8};
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 4, 8};
#endif
testlog.info("Running test workload with configuration: seed={}, concurrency={}, scans={}", cfg.seed,
testlog.info("Running test workload with configuration: seed={}, timeout={}s, concurrency={}, scans={}", cfg.seed, cfg.timeout.count(),
cfg.concurrency, cfg.scans);
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(tbl.schema), &compacted_frozen_mutations = tbl.compacted_frozen_mutations] {

View File

@@ -906,13 +906,9 @@ SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
BOOST_REQUIRE(!entry.permit.get_abort_exception());
// Don't waste time retrying before the timeout is up
sleep(ttl_timeout_test_timeout).get();
eventually_true([&entry] {
return bool(entry.permit.get_abort_exception());
});
sleep(ttl_timeout_test_timeout * 2).get();
BOOST_REQUIRE(entry.permit.get_abort_exception());
BOOST_REQUIRE_THROW(std::rethrow_exception(entry.permit.get_abort_exception()), seastar::named_semaphore_timed_out);
}

View File

@@ -2644,10 +2644,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
return rd;
};
{
memory::scoped_critical_alloc_section dfg;
populate_range(cache, population_range);
}
populate_range(cache, population_range);
auto rd1_v1 = assert_that(make_reader(population_range));
mutation_reader_opt snap;
auto close_snap = defer([&snap] {

View File

@@ -29,6 +29,7 @@
#include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "transport/response.hh"
BOOST_AUTO_TEST_SUITE(schema_change_test)
@@ -701,6 +702,16 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
return cql3::metadata{columns_specification}.calculate_metadata_id();
}
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
columns_specification.reserve(columns.size());
for (const auto& column : columns) {
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
}
return columns_specification;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
const auto c = std::make_pair("id", uuid_type);
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
@@ -751,6 +762,39 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
}
BOOST_AUTO_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
auto stale_response_metadata_id = empty_metadata_id;
auto columns_specification = make_columns_specification({{"role", utf8_type}});
cql3::metadata rows_metadata(columns_specification);
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
std::move(empty_metadata_id),
std::move(stale_response_metadata_id)), true);
auto body_stream = std::move(resp).extract_body();
auto body = body_stream.linearize();
const auto* ptr = reinterpret_cast<const char*>(body.begin());
const auto flags_mask = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
const auto column_count = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
BOOST_REQUIRE_EQUAL(column_count, 1);
const auto metadata_id_length = read_be<uint16_t>(ptr);
ptr += sizeof(uint16_t);
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
reinterpret_cast<const bytes::value_type*>(ptr)));
}
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
auto compute_metadata_id_for_type = [&](

View File

@@ -0,0 +1,328 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import dataclasses
import hashlib
import socket
import struct
import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.util import unique_name
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
# ---------------------------------------------------------------------------
# Minimal raw CQL v4 socket helpers with SCYLLA_USE_METADATA_ID extension.
#
# The standard Python driver never negotiates SCYLLA_USE_METADATA_ID and
# therefore never includes result_metadata_id in EXECUTE requests for
# protocol v4. In CQL v5 result_metadata_id exchange is mandatory and
# built into the wire format; until Scylla implements v5, this extension
# provides the same semantics on v4. The helpers below implement just
# enough of the CQL wire protocol to exercise the server-side prepared
# metadata promotion path introduced for v5 compatibility.
# ---------------------------------------------------------------------------
# CQL opcodes
_OP_STARTUP = 0x01
_OP_AUTH_RESPONSE = 0x0F
_OP_PREPARE = 0x09
_OP_EXECUTE = 0x0A
_OP_READY = 0x02
_OP_AUTHENTICATE = 0x03
_OP_RESULT = 0x08
_OP_AUTH_SUCCESS = 0x10
# RESULT kind codes
_RESULT_KIND_ROWS = 0x00000002
_RESULT_KIND_PREPARED = 0x00000004
# Rows metadata flags (bit positions in the uint32 flags field)
_META_NO_METADATA = 1 << 2
_META_METADATA_CHANGED = 1 << 3
# EXECUTE options flags (1-byte field in CQL v4)
_FLAG_SKIP_METADATA = 0x02
_FRAME_HEADER_SIZE = 9 # version(1)+flags(1)+stream(2)+opcode(1)+length(4)
_CQL_VERSION = "3.0.0"
_DEFAULT_CONSISTENCY = 0x0006 # LOCAL_QUORUM
def _pack_short(v: int) -> bytes:
return struct.pack(">H", v)
def _pack_int(v: int) -> bytes:
return struct.pack(">I", v)
def _short_bytes(b: bytes) -> bytes:
"""CQL [short bytes]: uint16 length prefix + payload."""
return _pack_short(len(b)) + b
def _long_string(s: str) -> bytes:
"""CQL [long string]: uint32 length prefix + UTF-8 bytes."""
b = s.encode()
return _pack_int(len(b)) + b
def _string_map(d: dict[str, str]) -> bytes:
"""CQL [string map]: uint16 count + (uint16-prefixed-string, uint16-prefixed-string)*."""
out = _pack_short(len(d))
for k, v in d.items():
out += _short_bytes(k.encode())
out += _short_bytes(v.encode())
return out
def _frame(opcode: int, body: bytes, stream: int) -> bytes:
"""Build a CQL v4 request frame."""
return struct.pack(">BBHBI", 0x04, 0x00, stream, opcode, len(body)) + body
def _recv_frame(sock: socket.socket) -> tuple[int, int, bytes]:
"""Read one CQL v4 response frame; return (stream, opcode, body)."""
header = b""
while len(header) < _FRAME_HEADER_SIZE:
chunk = sock.recv(_FRAME_HEADER_SIZE - len(header))
assert chunk, "Connection closed while reading frame header"
header += chunk
_version, _flags = struct.unpack(">BB", header[0:2])
stream = struct.unpack(">H", header[2:4])[0]
opcode = header[4]
length = struct.unpack(">I", header[5:9])[0]
body = b""
while len(body) < length:
chunk = sock.recv(length - len(body))
assert chunk, "Connection closed while reading frame body"
body += chunk
return stream, opcode, body
@dataclasses.dataclass
class ExecuteResult:
"""Parsed outcome of a ROWS EXECUTE response."""
metadata_changed: bool
no_metadata: bool
column_count: int
result_metadata_id: bytes | None
def _cql_connect(host: str, port: int, username: str, password: str) -> socket.socket:
"""
Open a raw TCP socket to *host*:*port* and perform the CQL v4 handshake,
negotiating the SCYLLA_USE_METADATA_ID extension so that result_metadata_id
is exchanged on the wire — identical to the mandatory CQL v5 behaviour.
"""
sock = socket.create_connection((host, port))
stream = 1
# STARTUP with SCYLLA_USE_METADATA_ID enables the v5-style metadata_id
# exchange for this v4 connection.
startup_opts = {"CQL_VERSION": _CQL_VERSION, "SCYLLA_USE_METADATA_ID": ""}
sock.sendall(_frame(_OP_STARTUP, _string_map(startup_opts), stream))
_, opcode, payload = _recv_frame(sock)
if opcode == _OP_READY:
return sock
assert opcode == _OP_AUTHENTICATE, (
f"Expected AUTHENTICATE(0x{_OP_AUTHENTICATE:02x}), got 0x{opcode:02x}"
)
# PlainText SASL token: NUL + username + NUL + password
creds = b"\x00" + username.encode() + b"\x00" + password.encode()
stream += 1
sock.sendall(_frame(_OP_AUTH_RESPONSE, _short_bytes(creds), stream))
_, auth_op, _ = _recv_frame(sock)
assert auth_op == _OP_AUTH_SUCCESS, f"Authentication failed: opcode=0x{auth_op:02x}"
return sock
def _cql_prepare(sock: socket.socket, stream: int, query: str) -> bytes:
"""PREPARE *query* and return the server-assigned query_id."""
sock.sendall(_frame(_OP_PREPARE, _long_string(query), stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_PREPARED, f"Expected PREPARED kind, got {kind}"
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
return bytes(payload[pos : pos + id_len])
def _cql_execute_with_metadata_id(
sock: socket.socket,
stream: int,
query_id: bytes,
result_metadata_id: bytes,
consistency: int = _DEFAULT_CONSISTENCY,
) -> ExecuteResult:
"""
Send EXECUTE carrying *result_metadata_id* on the wire.
With SCYLLA_USE_METADATA_ID active the server reads result_metadata_id
immediately after query_id (before the options block), mirroring CQL v5
wire format. SKIP_METADATA is set so a normal response returns no column
specs; only the METADATA_CHANGED promotion path returns actual metadata.
"""
# options block: [consistency: uint16][flags: byte]
options = struct.pack(">HB", consistency, _FLAG_SKIP_METADATA)
body = _short_bytes(query_id) + _short_bytes(result_metadata_id) + options
sock.sendall(_frame(_OP_EXECUTE, body, stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_ROWS, f"Expected ROWS kind, got {kind}"
meta_flags = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
column_count = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
metadata_changed = bool(meta_flags & _META_METADATA_CHANGED)
no_metadata = bool(meta_flags & _META_NO_METADATA)
response_metadata_id: bytes | None = None
if metadata_changed:
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
response_metadata_id = bytes(payload[pos : pos + id_len])
return ExecuteResult(
metadata_changed=metadata_changed,
no_metadata=no_metadata,
column_count=column_count,
result_metadata_id=response_metadata_id,
)
def _prepare_and_execute(
host: str, query: str, stale_metadata_id: bytes
) -> ExecuteResult:
"""
Open a raw socket connection (negotiating SCYLLA_USE_METADATA_ID), prepare
*query*, execute it with *stale_metadata_id*, and return the parsed result.
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
"""
sock = _cql_connect(host, 9042, "cassandra", "cassandra")
try:
stream = 1
stream += 1
query_id = _cql_prepare(sock, stream, query)
stream += 1
return _cql_execute_with_metadata_id(sock, stream, query_id, stale_metadata_id)
finally:
sock.close()
@pytest.mark.asyncio
async def test_list_roles_of_prepared_metadata_promotion(
manager: ManagerClient,
) -> None:
"""Verify that the server promotes the prepared metadata_id for statements
whose PREPARE response carries empty result metadata (NO_METADATA).
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
does not know the result set schema because the statement implementation
builds the metadata dynamically at execute time. The server therefore
returns the metadata_id of empty metadata in the PREPARE response.
When the client later sends EXECUTE with SKIP_METADATA and the stale
empty metadata_id, the server should detect the mismatch (the actual rows
have real metadata) and respond with a ``METADATA_CHANGED`` result that
carries the real metadata_id so the client can update its cache. This is
the behaviour mandated by CQL v5; on CQL v4 it is exercised via the
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
wire-level exchange.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
# Any non-empty bytes that differ from the real metadata_id serves as the
# "stale" cache entry the client would send after a PREPARE that returned
# empty metadata.
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
result = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}", stale_metadata_id
)
assert result.metadata_changed, (
f"expected EXECUTE for LIST ROLES OF {role} to return METADATA_CHANGED "
f"after PREPARE returned an empty result_metadata_id"
)
assert not result.no_metadata, (
f"expected EXECUTE for LIST ROLES OF {role} to not have NO_METADATA flag "
f"when METADATA_CHANGED is set"
)
assert result.result_metadata_id is not None, (
f"expected EXECUTE for LIST ROLES OF {role} to return a result_metadata_id "
f"alongside METADATA_CHANGED"
)
@pytest.mark.asyncio
@pytest.mark.skip_mode(
mode="release", reason="error injection is disabled in release mode"
)
async def test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection(
manager: ManagerClient,
) -> None:
"""Verify that the ``skip_rows_metadata_changed_response`` error injection
suppresses the metadata promotion, leaving the response with NO_METADATA
and without METADATA_CHANGED.
This is the negative/regression counterpart of
``test_list_roles_of_prepared_metadata_promotion``: it confirms that the
happy-path test is not a false positive by showing that the promotion can
be disabled, and that the injection point itself works correctly.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
async with inject_error(
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
):
async with inject_error(
manager.api, server.ip_addr, "skip_rows_metadata_changed_response"
):
result = await asyncio.to_thread(
_prepare_and_execute,
server.ip_addr,
f"LIST ROLES OF {role}",
stale_metadata_id,
)
assert not result.metadata_changed, (
f"expected injected EXECUTE for LIST ROLES OF {role} to suppress "
f"METADATA_CHANGED, but the flag was set"
)
assert result.no_metadata, (
f"expected injected EXECUTE for LIST ROLES OF {role} to keep the "
f"stale NO_METADATA path, but no_metadata flag was not set"
)

View File

@@ -257,44 +257,39 @@ async def manager(request: pytest.FixtureRequest,
yield manager_client
# `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve
# test failure.
cluster_status = None
found_errors = {}
failed = False
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
failed_test_dir_path = None
try:
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
if failed or found_errors:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
str.maketrans('[]', '()'))
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
if failed:
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
urllib.parse.quote(dir_path_relative))
record_property("TEST_LOGS", full_url)
if failed or found_errors:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
str.maketrans('[]', '()'))
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
cluster_status = await manager_client.after_test(test_case_name, not failed)
await manager_client.stop() # Stop client session and close driver after each test
if failed:
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
urllib.parse.quote(dir_path_relative))
record_property("TEST_LOGS", full_url)
cluster_status = await manager_client.after_test(test_case_name, not failed)
finally:
await manager_client.stop() # Stop client session and close driver after each test
if cluster_status is not None and cluster_status["server_broken"] and not failed:
if cluster_status["server_broken"] and not failed:
failed = True
pytest.fail(
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"

View File

@@ -10,7 +10,6 @@ import random
import string
import tempfile
import time
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from pprint import pformat
@@ -274,30 +273,6 @@ class TesterAlternator(BaseAlternator):
logger.info("Testing and validating an update query using key condition expression")
logger.info(f"ConditionExpression update of short circuit is: {conditional_update_short_circuit}")
dc2_table.update_item(**conditional_update_short_circuit)
# Wait for cross-DC replication to reach both live DC1 nodes
# before stopping dc2_node. The LWT commit uses LOCAL_QUORUM,
# which only guarantees DC2 persistence; replication to DC1 is
# async background work. Without this wait, stopping dc2_node
# can drop in-flight RPCs to DC1 while CAS mutations don't
# store hints. We must confirm both live DC1 replicas have the
# data so that the later ConsistentRead=True (LOCAL_QUORUM)
# read on restarted node1 is guaranteed to succeed.
# See https://scylladb.atlassian.net/browse/SCYLLADB-1267
dc1_live_nodes = [
node for node in self.cluster.nodelist()
if node.data_center == node1.data_center and node.server_id != node1.server_id
]
dc1_live_tables = [self.get_table(table_name=TABLE_NAME, node=node) for node in dc1_live_nodes]
wait_for(
lambda: all(
t.get_item(
Key={self._table_primary_key: new_pk_val}, ConsistentRead=False
).get("Item", {}).get("c") == 3
for t in dc1_live_tables
),
timeout=60,
text="Waiting for cross-DC replication of conditional update to both live DC1 nodes",
)
dc2_node.stop()
node1.start()
@@ -506,33 +481,28 @@ class TesterAlternator(BaseAlternator):
2) Issue Alternator 'heavy' requests concurrently (create-table)
3) wait for RequestLimitExceeded error response.
"""
# Keep the limit low to avoid exhausting LSA memory on the 1GB test node
# when multiple CreateTable requests (Raft + schema + flush) run concurrently.
concurrent_requests_limit = 3
concurrent_requests_limit = 5
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
node1 = self.cluster.nodelist()[0]
stop_workers = threading.Event()
create_tables_threads = []
for tables_num in range(concurrent_requests_limit * 5):
create_tables_threads.append(self.run_create_table_thread())
def run_create_table_until_limited() -> None:
while not stop_workers.is_set():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error: # noqa: BLE001
if "RequestLimitExceeded" in str(error):
stop_workers.set()
return
raise
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
def wait_for_create_table_request_failure():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error:
if "RequestLimitExceeded" in error.args[0]:
return
raise
raise ConcurrencyLimitNotExceededError
with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor:
create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)]
wait_for_create_table_request_failure()
if not stop_workers.wait(timeout=30):
raise ConcurrencyLimitNotExceededError
stop_workers.set()
for future in create_table_futures:
future.result(timeout=60)
for thread in create_tables_threads:
thread.join()
@staticmethod
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):

View File

@@ -1182,9 +1182,9 @@ class TestAuth(Tester):
def get_session(self, node_idx=0, user=None, password=None, exclusive=True):
node = self.cluster.nodelist()[node_idx]
if exclusive:
conn = self.patient_exclusive_cql_connection(node, user=user, password=password)
conn = self.patient_exclusive_cql_connection(node, user=user, password=password, timeout=0.1)
else:
conn = self.patient_cql_connection(node, user=user, password=password)
conn = self.patient_cql_connection(node, user=user, password=password, timeout=0.1)
return conn
def assert_permissions_listed(self, expected, session, query, include_superuser=False):

View File

@@ -199,7 +199,7 @@ class GSServer(GSFront):
def unpublish(self):
for k in self.vars:
v = self.oldvars.get(k)
v = self.oldvars[k]
if v:
os.environ[k] = v
elif os.environ.get(k):

View File

@@ -215,11 +215,6 @@ async def test_node_ops_tasks_tree(manager: ManagerClient):
servers, vt_ids = await check_remove_node_tasks_tree(manager, tm, module_name, servers, vt_ids)
servers, vt_ids = await check_decommission_tasks_tree(manager, tm, module_name, servers, vt_ids)
# Reconnect the driver after topology changes (replace, removenode,
# decommission) so that the new_test_keyspace cleanup can reach a
# live node for DROP KEYSPACE.
await manager.driver_connect()
@pytest.mark.asyncio
async def test_node_ops_tasks_ttl(manager: ManagerClient):
"""Test node ops virtual tasks' ttl."""

View File

@@ -17,7 +17,6 @@ import socket
import socketserver
import tempfile
import threading
import time
import uuid
from collections import namedtuple
from contextlib import contextmanager
@@ -34,9 +33,9 @@ from test.cluster.dtest.dtest_class import create_ks, wait_for
from test.cluster.dtest.tools.assertions import assert_invalid
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
from test.cluster.test_config import wait_for_config
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier
from test.pylib.util import wait_for as wait_for_async
logger = logging.getLogger(__name__)
@@ -114,10 +113,11 @@ class AuditTester:
for k in AUTH_CONFIG:
await self.manager.server_remove_config_option(srv.server_id, k)
# Remove absent keys so the server reverts to compiled-in defaults.
for k in absent_keys:
await self.manager.server_remove_config_option(srv.server_id, k)
if needs_restart:
# Remove absent keys so the server reverts to compiled-in defaults.
for k in absent_keys:
await self.manager.server_remove_config_option(srv.server_id, k)
await self.manager.server_stop_gracefully(srv.server_id)
full_cfg = self._build_server_config(needed, enable_compact_storage, user)
await self.manager.server_update_config(srv.server_id, config_options=full_cfg)
@@ -127,17 +127,10 @@ class AuditTester:
# Server stays up — only push live-updatable keys.
live_cfg = {k: v for k, v in needed.items() if k in LIVE_AUDIT_KEYS}
live_cfg["enable_create_table_with_compact_storage"] = enable_compact_storage
log_file = await self.manager.server_open_log(srv.server_id)
# Each remove/update sends a SIGHUP. Wait for each one's
# "completed re-reading configuration file" before the next
# so we never match a stale message.
for k in absent_keys:
from_mark = await log_file.mark()
await self.manager.server_remove_config_option(srv.server_id, k)
await log_file.wait_for(r"completed re-reading configuration file", from_mark=from_mark, timeout=60)
from_mark = await log_file.mark()
await self.manager.server_update_config(srv.server_id, config_options=live_cfg)
await log_file.wait_for(r"completed re-reading configuration file", from_mark=from_mark, timeout=60)
for key in LIVE_AUDIT_KEYS:
if key in live_cfg:
await wait_for_config(self.manager, srv, key, live_cfg[key])
async def _start_fresh_servers(self, needed: dict[str, str],
enable_compact_storage: bool,
@@ -352,7 +345,7 @@ class UnixSockerListener:
elif data != "Initializing syslog audit backend.":
self.server.parent_instance.lines.append(data)
class UnixDatagramServer(socketserver.UnixDatagramServer):
class UnixDatagramServer(socketserver.ThreadingUnixDatagramServer):
def __init__(self, socket_path, handler, parent_instance, lock):
self.parent_instance = parent_instance
self.mutex = lock
@@ -1349,13 +1342,7 @@ class CQLAuditTester(AuditTester):
conn = await self.manager.get_cql_exclusive(srv)
stmt = SimpleStatement("INSERT INTO ks.test1 (k, v1) VALUES (1000, 1000)", consistency_level=ConsistencyLevel.THREE)
conn.execute(stmt)
# The audit log entry may not be visible immediately after the
# insert, so retry with exponential backoff until it appears.
audit_node_ips = await wait_for_async(
lambda: self.get_audit_partitions_for_operation(session, stmt.query_string),
deadline=time.time() + 10,
period=0.05,
label=f"audit entry for node {index}")
audit_node_ips = await self.get_audit_partitions_for_operation(session, stmt.query_string)
node_to_audit_nodes[index] = set(audit_node_ips)
all_addresses = set(srv.ip_addr for srv in servers)

View File

@@ -1,70 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import asyncio
import time
import pytest
from test.cluster.util import get_current_group0_config
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier
from test.pylib.util import wait_for
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_bootstrap_with_quick_group0_join(manager: ManagerClient):
"""Regression test for https://scylladb.atlassian.net/browse/SCYLLADB-959.
The bug was that when the bootstrapping node joined group0 before reaching
post_server_start, it skipped post_server_start and thus hung forever.
The test simulates the scenario by starting the second node with the
join_group0_pause_before_config_check injection. Without the fix, the
startup times out.
"""
logger.info("Adding first server")
s1 = await manager.server_add()
logger.info("Adding second server with join_group0_pause_before_config_check enabled")
s2 = await manager.server_add(start=False, config={
'error_injections_at_startup': ['join_group0_pause_before_config_check']
})
logger.info(f"Starting {s2}")
start_task = asyncio.create_task(manager.server_start(s2.server_id))
s2_log = await manager.server_open_log(s2.server_id)
await s2_log.wait_for("join_group0_pause_before_config_check: waiting for message", timeout=60)
s1_host_id = await manager.get_host_id(s1.server_id)
s2_host_id = await manager.get_host_id(s2.server_id)
async def s2_in_group0_config_on_s1():
config = await get_current_group0_config(manager, s1)
ids = {m[0] for m in config}
assert s1_host_id in ids # sanity check
return True if s2_host_id in ids else None
# Note: we would like to wait for s2 to see itself in the group0 config, but we can't execute
# get_current_group0_config for s2, as s2 doesn't handle CQL requests at this point. As a workaround, we wait for s1
# to see s2 and then perform a read barrier on s2.
logger.info(f"Waiting for {s1} to see {s2} in the group0 config")
await wait_for(s2_in_group0_config_on_s1, deadline=time.time() + 60, period=0.1)
logger.info(f"Performing read barrier on {s2} to make sure it sees itself in the group0 config")
await read_barrier(manager.api, s2.ip_addr)
logger.info(f"Unblocking {s2}")
await manager.api.message_injection(s2.ip_addr, 'join_group0_pause_before_config_check')
logger.info(f"Waiting for {s2} to complete bootstrap")
await asyncio.wait_for(start_task, timeout=60)

View File

@@ -100,7 +100,7 @@ async def test_limited_concurrency_of_writes(manager: ManagerClient):
})
node2 = await manager.server_add()
cql = await manager.get_cql_exclusive(node1)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}") as ks:
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")

View File

@@ -11,6 +11,7 @@ from test.cluster.util import check_token_ring_and_group0_consistency, new_test_
import pytest
import asyncio
import logging
import time
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@@ -52,7 +53,7 @@ async def test_cleanup_stop(manager: ManagerClient):
await s0_log.wait_for('sstable_cleanup_wait: waiting', from_mark=s0_mark)
stop_cleanup = asyncio.create_task(manager.api.stop_compaction(servers[0].ip_addr, "CLEANUP"))
await asyncio.sleep(1)
time.sleep(1)
await manager.api.message_injection(servers[0].ip_addr, "sstable_cleanup_wait")
await stop_cleanup

View File

@@ -2279,7 +2279,7 @@ async def test_split_stopped_on_shutdown(manager: ManagerClient):
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
await log.wait_for('Stopping.*ongoing compactions', from_mark=log_mark)
await log.wait_for('Stopping.*ongoing compactions')
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)

View File

@@ -6,7 +6,6 @@
import os
import random
import socket
import subprocess
import sys
import time
@@ -60,14 +59,7 @@ async def server_address(request, testpy_test: None|Test):
ip = await testpy_test.suite.hosts.lease_host()
else:
ip = f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
# Ask the OS to pick a free port by binding to port 0. This avoids
# collisions with ports still in TIME_WAIT from a previous test module
# that used the same IP. SO_REUSEADDR is set on the probe socket so it
# can reclaim a TIME_WAIT port itself
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, 0))
port = s.getsockname()[1]
port = random.randint(10000, 65535)
yield ServerAddress(ip, port)
if testpy_test is not None:
await testpy_test.suite.hosts.release_host(ip)

View File

@@ -257,7 +257,7 @@ async def run_server(ip, port):
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, ip, port, reuse_address=True, reuse_port=True)
site = aiohttp.web.TCPSite(runner, ip, port)
await site.start()
try:

View File

@@ -4,18 +4,14 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import shutil
import itertools
import asyncio
import pathlib
import re
import os
import subprocess
from typing import Callable
logger = logging.getLogger("DockerizedServer")
class DockerizedServer:
"""class for running an external dockerized service image, typically mock server"""
# pylint: disable=too-many-instance-attributes
@@ -41,7 +37,6 @@ class DockerizedServer:
self.port = None
self.proc = None
self.service_port = port
self.echo_thread = None
async def start(self):
"""Starts docker image on a random port"""
@@ -50,107 +45,77 @@ class DockerizedServer:
if exe is not None)).resolve()
sid = f"{os.getpid()}-{DockerizedServer.newid()}"
name = f'{self.logfilenamebase}-{sid}'
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
self.logfile = logfilename.open("wb")
docker_args = self.docker_args(self.host, self.service_port)
image_args = self.image_args(self.host, self.service_port)
args = [exe, "run", "--name", name, "--rm" ]
if self.service_port is None:
args = args + ["-P"]
else:
args = args + ["-p", str(self.service_port)]
args = args + docker_args + [self.image] + image_args
# This seems weird, using the blocking IO subprocess.
# However, we want to use a pipe reader so we can push the
# output into the test log (because we are bad at propagating
# log files etc from CI)
# But the pipe reader needs to read until EOF, otherwise the
# docker process will eventually hang. So we can't await a
# coroutine.
# We _can_, sort of, use pool.create_task(...) to send a coro
# to the background, and use a signal for waiting, like here,
# thus ensuring the coro runs forever, sort of... However,
# this currently breaks, probably due to some part of the
# machinery/tests that don't async fully, causing us to not
# process the log, and thus hand/fail, bla bla.
# The solution is to make the process synced, and use a
# background thread (execution pool) for the processing.
# This way we know the pipe reader will not suddenly get
# blocked at inconvinient times.
proc = subprocess.Popen(args, stderr=subprocess.PIPE)
loop = asyncio.get_running_loop()
ready_fut = loop.create_future()
def process_io():
f = ready_fut
try:
while True:
data = proc.stderr.readline()
if not data:
if f:
loop.call_soon_threadsafe(f.set_exception, RuntimeError("Log EOF"))
logger.debug("EOF received")
break
line = data.decode()
self.logfile.write(data)
logger.debug(line)
if f and self.is_success_line(line, self.service_port):
logger.info('Got start message: %s', line)
loop.call_soon_threadsafe(f.set_result, True)
f = None
if f and self.is_failure_line(line, self.service_port):
logger.info('Got fail message: %s', line)
loop.call_soon_threadsafe(f.set_result, False)
f = None
except Exception as e:
logger.error("Exception in log processing: %s", e)
if f:
loop.call_soon_threadsafe(f.set_exception, e)
self.echo_thread = loop.run_in_executor(None, process_io)
ok = await ready_fut
if not ok:
self.logfile.close()
proc.kill()
proc.wait()
raise RuntimeError("Could not parse expected launch message from container")
check_proc = await asyncio.create_subprocess_exec(exe
, *["container", "port", name]
, stdout=asyncio.subprocess.PIPE
)
while True:
data = await check_proc.stdout.readline()
if not data:
break
s = data.decode()
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
if m:
self.port = int(m.group(1))
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
self.logfile = logfilename.open("wb")
await check_proc.wait()
if not self.port:
proc.kill()
proc.wait()
raise RuntimeError("Could not query port from container")
self.proc = proc
docker_args = self.docker_args(self.host, self.service_port)
image_args = self.image_args(self.host, self.service_port)
args = ["run", "--name", name, "--rm" ]
if self.service_port is None:
args = args + ["-P"]
else:
args = args + ["-p", str(self.service_port)]
args = args + docker_args + [self.image] + image_args
proc = await asyncio.create_subprocess_exec(exe, *args, stderr=self.logfile)
failed = False
# In any sane world we would just pipe stderr to a pipe and launch a background
# task to just readline from there to both check the start message as well as
# add it to the log (preferrably via logger).
# This works fine when doing this in a standalone python script.
# However, for some reason, when run in a pytest fixture, the pipe will fill up,
# without or reader waking up and doing anyhing, and for any test longer than very
# short, we will fill the stderr buffer and hang.
# I cannot figure out how to get around this, so we workaround it
# instead by directing stderr to a log file, and simply repeatedly
# try to read the info from this file until we are happy.
async with asyncio.timeout(120):
done = False
while not done and not failed:
with logfilename.open("r") as f:
for line in f:
if self.is_success_line(line, self.service_port):
print(f'Got start message: {line}')
done = True
break
if self.is_failure_line(line, self.service_port):
print(f'Got fail message: {line}')
failed = True
break
if failed:
self.logfile.close()
await proc.wait()
continue
check_proc = await asyncio.create_subprocess_exec(exe
, *["container", "port", name]
, stdout=asyncio.subprocess.PIPE
)
while True:
data = await check_proc.stdout.readline()
if not data:
break
s = data.decode()
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
if m:
self.port = int(m.group(1))
await check_proc.wait()
if not self.port:
proc.kill()
raise RuntimeError("Could not query port from container")
self.proc = proc
break
async def stop(self):
"""Stops docker image"""
if self.proc:
logger.debug("Stopping docker process")
self.proc.terminate()
self.proc.wait()
self.proc = None
if self.echo_thread:
logger.debug("Waiting for IO thread")
await self.echo_thread
self.echo_thread = None
await self.proc.wait()
if self.logfile:
logger.debug("Closing log file")
self.logfile.close()
self.logfile = None

View File

@@ -747,8 +747,6 @@ class ScyllaServer:
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
self.notify_socket.bind(str(self.notify_socket_path))
self._received_serving = False
loop = asyncio.get_running_loop()
def poll_status(s: socket.socket, f: asyncio.Future, logger: Union[logging.Logger, logging.LoggerAdapter]):
# Try to read all available messages from the socket
while True:
@@ -758,7 +756,7 @@ class ScyllaServer:
message = data.decode('utf-8', errors='replace')
if 'STATUS=serving' in message:
logger.debug("Received sd_notify 'serving' message")
loop.call_soon_threadsafe(f.set_result, True)
f.set_result(True)
return
if 'STATUS=entering maintenance mode' in message:
logger.debug("Receive sd_notify 'entering maintenance mode'")
@@ -768,9 +766,9 @@ class ScyllaServer:
except Exception as e:
logger.debug("Error reading from notify socket: %s", e)
break
loop.call_soon_threadsafe(f.set_result, False)
f.set_result(False)
self.serving_signal = loop.create_future()
self.serving_signal = asyncio.get_running_loop().create_future()
t = threading.Thread(target=poll_status, args=[self.notify_socket, self.serving_signal, self.logger], daemon=True)
t.start()
@@ -894,6 +892,7 @@ class ScyllaServer:
return
await report_error("the node startup failed, but the log file doesn't contain the expected error")
await report_error("failed to start the node")
self.logger.info("Wait me %s expect %s is %s", self.server_id, expected_server_up_state, server_up_state)
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED

View File

@@ -64,25 +64,11 @@ async def wait_for(
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
last_exception: Exception | None = None
while True:
elapsed = time.time() - start
if time.time() >= deadline:
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
if last_exception is not None:
timeout_msg += (
f"; last exception: {type(last_exception).__name__}: {last_exception}"
)
raise AssertionError(timeout_msg) from last_exception
raise AssertionError(timeout_msg)
try:
res = await pred()
last_exception = None
except Exception as exc:
res = None
last_exception = exc
assert time.time() < deadline, \
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
res = await pred()
if res is not None:
if retries > 0:
logger.debug(f"wait_for({tag}) completed "

View File

@@ -200,13 +200,13 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
_metrics.add_group("tracing_keyspace_helper", {
sm::make_counter("tracing_errors", [this] { return _stats.tracing_errors; },
sm::description("Counts a number of errors during writing to a system_traces keyspace. "
"One error may cause one or more tracing records to be lost.")).set_skip_when_empty(),
"One error may cause one or more tracing records to be lost.")),
sm::make_counter("bad_column_family_errors", [this] { return _stats.bad_column_family_errors; },
sm::description("Counts a number of times write failed due to one of the tables in the system_traces keyspace has an incompatible schema. "
"One error may result one or more tracing records to be lost. "
"Non-zero value indicates that the administrator has to take immediate steps to fix the corresponding schema. "
"The appropriate error message will be printed in the syslog.")).set_skip_when_empty(),
"The appropriate error message will be printed in the syslog.")),
});
}

View File

@@ -39,17 +39,17 @@ tracing::tracing(sstring tracing_backend_helper_class_name)
_metrics.add_group("tracing", {
sm::make_counter("dropped_sessions", stats.dropped_sessions,
sm::description("Counts a number of dropped sessions due to too many pending sessions/records. "
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
sm::make_counter("dropped_records", stats.dropped_records,
sm::description("Counts a number of dropped records due to too many pending records. "
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
sm::make_counter("trace_records_count", stats.trace_records_count,
sm::description("This metric is a rate of tracing records generation.")),
sm::make_counter("trace_errors", stats.trace_errors,
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")).set_skip_when_empty(),
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")),
sm::make_gauge("active_sessions", _active_sessions,
sm::description("Holds a number of a currently active tracing sessions.")),

View File

@@ -69,6 +69,7 @@
#include "message/messaging_service.hh"
#include "idl/forward_cql.dist.hh"
#include "utils/bit_cast.hh"
#include "utils/error_injection.hh"
#include "utils/labels.hh"
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
@@ -1633,13 +1634,26 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
}
tracing::trace(trace_state, "Processing a statement");
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
auto cache_key_for_metadata = cache_key;
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, prepared, std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id), &qp, cache_key = std::move(cache_key_for_metadata), prepared = std::move(prepared)] (auto msg) mutable {
if (msg->move_to_shard()) {
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
} else {
if (prepared->result_metadata_is_empty()
&& metadata_id.has_request_metadata_id()
&& !utils::get_local_injector().enter("skip_prepared_result_metadata_promotion")) {
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
auto rows_metadata_id = rows->rs().get_metadata().calculate_metadata_id();
clogger.debug("prepared result metadata promotion: request_metadata_id_present={}, calculated_rows_metadata_id_size={}",
metadata_id.has_request_metadata_id(), rows_metadata_id._metadata_id.size());
qp.local().update_prepared_result_metadata_id(cache_key, rows_metadata_id);
auto request_metadata_id = metadata_id.get_request_metadata_id();
metadata_id = cql_metadata_id_wrapper(std::move(request_metadata_id), std::move(rows_metadata_id));
}
}
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
}
@@ -2507,9 +2521,16 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
no_metadata, skip_rows_metadata_changed_response);
if (!skip_rows_metadata_changed_response) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
}
}