Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Kaul
2c5727753a .github/workflows/trigger-scylla-ci.yaml:3: Potential fix for code scanning alert no. 184: Workflow does not contain permissions
Reduce permissions to 'read'.

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2026-03-23 16:46:06 +02:00
44 changed files with 571 additions and 1632 deletions

View File

@@ -1,4 +1,5 @@
name: Trigger Scylla CI Route
permissions:
contents: read

View File

@@ -1,8 +1,5 @@
name: Trigger next gating
permissions:
contents: read
on:
push:
branches:

View File

@@ -583,7 +583,8 @@ sstable_format: ms
audit: "table"
#
# List of statement categories that should be audited.
audit_categories: "DCL,DDL,AUTH,ADMIN"
# Possible categories are: QUERY, DML, DCL, DDL, AUTH, ADMIN
audit_categories: "DCL,AUTH,ADMIN"
#
# List of tables that should be audited.
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"

View File

@@ -143,15 +143,6 @@ public:
return value_type();
}
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
cache_value_ptr vp = _cache.find(key.key());
if (!vp) {
return false;
}
(*vp)->update_result_metadata_id(std::move(metadata_id));
return true;
}
template <typename Pred>
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
void remove_if(Pred&& pred) {

View File

@@ -260,10 +260,6 @@ public:
return _prepared_cache.find(key);
}
bool update_prepared_result_metadata_id(const prepared_cache_key_type& key, cql_metadata_id_type metadata_id) {
return _prepared_cache.update_result_metadata_id(key, std::move(metadata_id));
}
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared(

View File

@@ -52,7 +52,6 @@ public:
std::vector<sstring> warnings;
private:
cql_metadata_id_type _metadata_id;
bool _result_metadata_is_empty;
public:
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
@@ -72,15 +71,6 @@ public:
void calculate_metadata_id();
cql_metadata_id_type get_metadata_id() const;
bool result_metadata_is_empty() const {
return _result_metadata_is_empty;
}
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
_metadata_id = std::move(metadata_id);
_result_metadata_is_empty = false;
}
};
}

View File

@@ -49,7 +49,6 @@ prepared_statement::prepared_statement(
, partition_key_bind_indices(std::move(partition_key_bind_indices))
, warnings(std::move(warnings))
, _metadata_id(bytes{})
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
{
statement->set_audit_info(std::move(audit_info));
}

View File

@@ -1582,7 +1582,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"\tnone : No auditing enabled.\n"
"\tsyslog : Audit messages sent to Syslog.\n"
"\ttable : Audit messages written to column family named audit.audit_log.\n")
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,DDL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
, audit_tables(this, "audit_tables", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of table names (<keyspace>.<table>) that will be audited.")
, audit_keyspaces(this, "audit_keyspaces", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited")
, audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writing to syslog. Only applicable when audit is set to syslog.")

View File

@@ -727,12 +727,7 @@ public:
// now we need one page more to be able to save one for next lap
auto fill_size = align_up(buf1.size(), block_size) + block_size - buf1.size();
// If the underlying stream is already at EOF (e.g. buf1 came from
// cached _next while the previous read_exactly drained the source),
// skip the read_exactly call — it would return empty anyway.
auto buf2 = _input.eof()
? temporary_buffer<char>()
: co_await _input.read_exactly(fill_size);
auto buf2 = co_await _input.read_exactly(fill_size);
temporary_buffer<char> output(buf1.size() + buf2.size());

View File

@@ -42,14 +42,7 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
const auto replication_factor = erm.get_replication_factor();
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
if (read_replicas.size() > replication_factor + 1) {
return seastar::format(
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
read_replicas.size(), replication_factor);
}
} else if (read_replicas.size() > replication_factor) {
if (read_replicas.size() > replication_factor) {
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
}
return {};

View File

@@ -15,7 +15,6 @@ from typing import Any, Optional
import asyncio
import contextlib
import glob
import hashlib
import json
import logging
import os
@@ -365,14 +364,12 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
llvm_profile_file = f"{addr}-%m.profraw"
scylla_workdir = f"{addr}"
logfile = f"{addr}.log"
socket = maintenance_socket_path(cluster_workdir, addr)
command = [
"env",
f"LLVM_PROFILE_FILE={llvm_profile_file}",
f"SCYLLA_HOME={os.path.realpath(os.getcwd())}", # We assume that the script has Scylla's `conf/` as its filesystem neighbour.
os.path.realpath(executable),
f"--workdir={scylla_workdir}",
f"--maintenance-socket={socket}",
"--ring-delay-ms=0",
"--developer-mode=yes",
"--memory=1G",
@@ -394,7 +391,6 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
f"--authenticator=PasswordAuthenticator",
f"--authorizer=CassandraAuthorizer",
] + list(extra_opts)
training_logger.info(f"Using maintenance socket {socket}")
return await run(['bash', '-c', fr"""exec {shlex.join(command)} >{q(logfile)} 2>&1"""], cwd=cluster_workdir)
async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optional[list[str]], workdir: PathLike, cluster_name: str, extra_opts: list[str]) -> list[Process]:
@@ -437,25 +433,16 @@ async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optiona
procs.append(proc)
await wait_for_node(proc, addrs[i], timeout)
except:
await stop_cluster(procs, addrs, cluster_workdir=workdir)
await stop_cluster(procs, addrs)
raise
return procs
async def stop_cluster(procs: list[Process], addrs: list[str], cluster_workdir: PathLike) -> None:
async def stop_cluster(procs: list[Process], addrs: list[str]) -> None:
"""Stops a Scylla cluster started with start_cluster().
Doesn't return until all nodes exit, even if stop_cluster() is cancelled.
"""
await clean_gather(*[cancel_process(p, timeout=60) for p in procs])
_cleanup_short_sockets(cluster_workdir, addrs)
def _cleanup_short_sockets(cluster_workdir: PathLike, addrs: list[str]) -> None:
"""Remove short maintenance socket files created in /tmp."""
for addr in addrs:
try:
os.unlink(maintenance_socket_path(cluster_workdir, addr))
except OSError:
pass
async def wait_for_port(addr: str, port: int) -> None:
await bash(fr'until printf "" >>/dev/tcp/{addr}/{port}; do sleep 0.1; done 2>/dev/null')
@@ -466,17 +453,12 @@ async def merge_profraw(directory: PathLike) -> None:
await bash(fr"llvm-profdata merge {q(directory)}/*.profraw -output {q(directory)}/prof.profdata")
def maintenance_socket_path(cluster_workdir: PathLike, addr: str) -> str:
"""Return the maintenance socket path for a node.
"""Returns the absolute path of the maintenance socket for a given node.
Returns a short deterministic path in /tmp (derived from an MD5 hash of
the natural ``<cluster_workdir>/<addr>/cql.m`` path) to stay within the
Unix domain socket length limit.
The same path is passed to Scylla via ``--maintenance-socket`` in
``start_node()``.
With ``maintenance_socket: workdir`` in scylla.yaml the socket lives at
``<node-workdir>/cql.m``, i.e. ``<cluster_workdir>/<addr>/cql.m``.
"""
natural = os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
path_hash = hashlib.md5(natural.encode()).hexdigest()[:12]
return os.path.join(tempfile.gettempdir(), f'pgo-{path_hash}.m')
return os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
async def setup_cassandra_user(workdir: PathLike, addr: str) -> None:
"""Create the ``cassandra`` superuser via the maintenance socket.
@@ -543,7 +525,7 @@ async def with_cluster(executable: PathLike, workdir: PathLike, cpusets: Optiona
yield addrs, procs
finally:
training_logger.info(f"Stopping the cluster in {workdir}")
await stop_cluster(procs, addrs, cluster_workdir=workdir)
await stop_cluster(procs, addrs)
training_logger.info(f"Stopped the cluster in {workdir}")
################################################################################

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
size 6598648
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
size 6502668

View File

@@ -1109,18 +1109,6 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
// case.
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
_stats.store_term_and_vote++;
// When the term advances, any in-flight snapshot transfers
// belong to an outdated term: the progress tracker has been
// reset in become_leader() or we are now a follower.
// Abort them before we dispatch this batch's messages, which
// may start fresh transfers for the new term.
//
// A vote may also change independently of the term (e.g. a
// follower voting for a candidate at the same term), but in
// that case there are no in-flight transfers and the abort
// is a no-op.
abort_snapshot_transfers();
}
if (batch.snp) {
@@ -1230,6 +1218,8 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
// quickly) stop happening (we're outside the config after all).
co_await _apply_entries.push_eventually(removed_from_config{});
}
// request aborts of snapshot transfers
abort_snapshot_transfers();
// abort all read barriers
for (auto& r : _reads) {
r.promise.set_value(not_a_leader{_fsm->current_leader()});

View File

@@ -1622,14 +1622,14 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
size_t next_file_id = 0;
for (auto file_id : found_file_ids) {
if (file_id != next_file_id) {
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id).string()));
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id)));
}
next_file_id++;
}
// populate index from all segments. keep the latest record for each key.
for (auto file_id : found_file_ids) {
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size());
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id), (file_id + 1) * 100 / found_file_ids.size());
co_await max_concurrent_for_each(segments_in_file(file_id), 32,
[this, &db] (log_segment_id seg_id) {
return recover_segment(db, seg_id);

View File

@@ -4860,14 +4860,13 @@ table::query(schema_ptr query_schema,
}
std::optional<full_position> last_pos;
if (querier_opt) {
if (querier_opt->current_position()) {
last_pos.emplace(*querier_opt->current_position());
}
if (!saved_querier || (!querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
co_await querier_opt->close();
querier_opt = {};
}
if (querier_opt && querier_opt->current_position()) {
last_pos.emplace(*querier_opt->current_position());
}
if (!saved_querier || (querier_opt && !querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
co_await querier_opt->close();
querier_opt = {};
}
if (saved_querier) {
*saved_querier = std::move(querier_opt);

View File

@@ -87,11 +87,6 @@ target_include_directories(wasmtime_bindings
target_link_libraries(wasmtime_bindings
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
# this PCH must have matching compile options, otherwise the compiler rejects
# the PCH due to flag mismatch (e.g., -fsanitize=address).
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
endif()
@@ -113,6 +108,5 @@ target_include_directories(inc
target_link_libraries(inc
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_link_libraries(inc PRIVATE Seastar::seastar)
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
endif()

View File

@@ -910,7 +910,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
} else {
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
frozen_muts_to_apply.push_back(co_await freeze_gently(m));
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
});
}
}

View File

@@ -181,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
help="Run only tests for given build mode(s)")
parser.add_argument('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution")
parser.add_argument('--timeout', action="store", default="3600", type=int,
parser.add_argument('--timeout', action="store", default="24000", type=int,
help="timeout value for single test execution")
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
help="timeout value for test.py/pytest session execution")

View File

@@ -23,11 +23,8 @@
#include "test/lib/tmpdir.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/exception_utils.hh"
#include "test/lib/limiting_data_source.hh"
#include "utils/io-wrappers.hh"
#include <seastar/util/memory-data-source.hh>
using namespace encryption;
static tmpdir dir;
@@ -598,113 +595,6 @@ SEASTAR_TEST_CASE(test_encrypted_data_source_simple) {
co_await test_random_data_source(sizes);
}
// Reproduces the production deadlock where encrypted SSTable component downloads
// got stuck during restore. The encrypted_data_source::get() caches a block in
// _next, then on the next call bypasses input_stream::read()'s _eof check and
// calls input_stream::read_exactly() — which does NOT check _eof when _buf is
// empty. This causes a second get() on the underlying source after EOS.
//
// In production the underlying source was chunked_download_source whose get()
// hung forever. Here we simulate it with a strict source that fails the test.
//
// The fix belongs in seastar's input_stream::read_exactly(): check _eof before
// calling _fd.get(), consistent with read(), read_up_to(), and consume().
static future<> test_encrypted_source_copy(size_t plaintext_size) {
testlog.info("test_encrypted_source_copy: plaintext_size={}", plaintext_size);
key_info info{"AES/CBC", 256};
auto k = ::make_shared<symmetric_key>(info);
// Step 1: Encrypt the plaintext into memory buffers
auto plaintext = generate_random<char>(plaintext_size);
std::vector<temporary_buffer<char>> encrypted_bufs;
{
data_sink sink(make_encrypted_sink(create_memory_sink(encrypted_bufs), k));
co_await sink.put(plaintext.clone());
co_await sink.close();
}
// Flatten encrypted buffers into a single contiguous buffer
size_t encrypted_total = 0;
for (const auto& b : encrypted_bufs) {
encrypted_total += b.size();
}
temporary_buffer<char> encrypted(encrypted_total);
size_t pos = 0;
for (const auto& b : encrypted_bufs) {
std::copy(b.begin(), b.end(), encrypted.get_write() + pos);
pos += b.size();
}
// Step 2: Create a data source from the encrypted data that fails on
// post-EOS get() — simulating a source like chunked_download_source
// that would hang forever in this situation.
class strict_memory_source final : public limiting_data_source_impl {
bool _eof = false;
public:
strict_memory_source(temporary_buffer<char> data, size_t chunk_size)
: limiting_data_source_impl(
data_source(std::make_unique<util::temporary_buffer_data_source>(std::move(data))),
chunk_size) {}
future<temporary_buffer<char>> get() override {
BOOST_REQUIRE_MESSAGE(!_eof,
"get() called on source after it already returned EOS — "
"this is the production deadlock: read_exactly() does not "
"check _eof before calling _fd.get()");
auto buf = co_await limiting_data_source_impl::get();
_eof = buf.empty();
co_return buf;
}
};
// Step 3: Wrap in encrypted_data_source and drain via consume() —
// the exact code path used by seastar::copy() which is what
// sstables_loader_helpers::download_sstable() calls.
// Try multiple chunk sizes to hit different alignment scenarios.
for (size_t chunk_size : {1ul, 7ul, 4096ul, 8192ul, encrypted_total, encrypted_total + 1}) {
if (chunk_size == 0) continue;
auto src = data_source(make_encrypted_source(
data_source(std::make_unique<strict_memory_source>(encrypted.clone(), chunk_size)), k));
auto in = input_stream<char>(std::move(src));
// consume() is what seastar::copy() uses internally. It calls
// encrypted_data_source::get() via _fd.get() until EOF.
size_t total_decrypted = 0;
co_await in.consume([&total_decrypted](temporary_buffer<char> buf) {
total_decrypted += buf.size();
return make_ready_future<consumption_result<char>>(continue_consuming{});
});
co_await in.close();
BOOST_REQUIRE_EQUAL(total_decrypted, plaintext_size);
}
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_8k) {
co_await test_encrypted_source_copy(8192);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_4k) {
co_await test_encrypted_source_copy(4096);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_small) {
co_await test_encrypted_source_copy(100);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_12k) {
co_await test_encrypted_source_copy(12288);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_unaligned) {
co_await test_encrypted_source_copy(8193);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_1byte) {
co_await test_encrypted_source_copy(1);
}
SEASTAR_TEST_CASE(test_encrypted_data_source_fuzzy) {
std::mt19937_64 rand_gen(std::random_device{}());

View File

@@ -29,7 +29,6 @@
#include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "transport/response.hh"
BOOST_AUTO_TEST_SUITE(schema_change_test)
@@ -702,16 +701,6 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
return cql3::metadata{columns_specification}.calculate_metadata_id();
}
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
columns_specification.reserve(columns.size());
for (const auto& column : columns) {
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
}
return columns_specification;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
const auto c = std::make_pair("id", uuid_type);
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
@@ -762,39 +751,6 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
}
BOOST_AUTO_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
auto stale_response_metadata_id = empty_metadata_id;
auto columns_specification = make_columns_specification({{"role", utf8_type}});
cql3::metadata rows_metadata(columns_specification);
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
std::move(empty_metadata_id),
std::move(stale_response_metadata_id)), true);
auto body_stream = std::move(resp).extract_body();
auto body = body_stream.linearize();
const auto* ptr = reinterpret_cast<const char*>(body.begin());
const auto flags_mask = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
const auto column_count = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
BOOST_REQUIRE_EQUAL(column_count, 1);
const auto metadata_id_length = read_be<uint16_t>(ptr);
ptr += sizeof(uint16_t);
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
reinterpret_cast<const bytes::value_type*>(ptr)));
}
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
auto compute_metadata_id_for_type = [&](

View File

@@ -1,328 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import dataclasses
import hashlib
import socket
import struct
import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.util import unique_name
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
# ---------------------------------------------------------------------------
# Minimal raw CQL v4 socket helpers with SCYLLA_USE_METADATA_ID extension.
#
# The standard Python driver never negotiates SCYLLA_USE_METADATA_ID and
# therefore never includes result_metadata_id in EXECUTE requests for
# protocol v4. In CQL v5 result_metadata_id exchange is mandatory and
# built into the wire format; until Scylla implements v5, this extension
# provides the same semantics on v4. The helpers below implement just
# enough of the CQL wire protocol to exercise the server-side prepared
# metadata promotion path introduced for v5 compatibility.
# ---------------------------------------------------------------------------
# CQL opcodes
_OP_STARTUP = 0x01
_OP_AUTH_RESPONSE = 0x0F
_OP_PREPARE = 0x09
_OP_EXECUTE = 0x0A
_OP_READY = 0x02
_OP_AUTHENTICATE = 0x03
_OP_RESULT = 0x08
_OP_AUTH_SUCCESS = 0x10
# RESULT kind codes
_RESULT_KIND_ROWS = 0x00000002
_RESULT_KIND_PREPARED = 0x00000004
# Rows metadata flags (bit positions in the uint32 flags field)
_META_NO_METADATA = 1 << 2
_META_METADATA_CHANGED = 1 << 3
# EXECUTE options flags (1-byte field in CQL v4)
_FLAG_SKIP_METADATA = 0x02
_FRAME_HEADER_SIZE = 9 # version(1)+flags(1)+stream(2)+opcode(1)+length(4)
_CQL_VERSION = "3.0.0"
_DEFAULT_CONSISTENCY = 0x0006 # LOCAL_QUORUM
def _pack_short(v: int) -> bytes:
return struct.pack(">H", v)
def _pack_int(v: int) -> bytes:
return struct.pack(">I", v)
def _short_bytes(b: bytes) -> bytes:
"""CQL [short bytes]: uint16 length prefix + payload."""
return _pack_short(len(b)) + b
def _long_string(s: str) -> bytes:
"""CQL [long string]: uint32 length prefix + UTF-8 bytes."""
b = s.encode()
return _pack_int(len(b)) + b
def _string_map(d: dict[str, str]) -> bytes:
"""CQL [string map]: uint16 count + (uint16-prefixed-string, uint16-prefixed-string)*."""
out = _pack_short(len(d))
for k, v in d.items():
out += _short_bytes(k.encode())
out += _short_bytes(v.encode())
return out
def _frame(opcode: int, body: bytes, stream: int) -> bytes:
"""Build a CQL v4 request frame."""
return struct.pack(">BBHBI", 0x04, 0x00, stream, opcode, len(body)) + body
def _recv_frame(sock: socket.socket) -> tuple[int, int, bytes]:
"""Read one CQL v4 response frame; return (stream, opcode, body)."""
header = b""
while len(header) < _FRAME_HEADER_SIZE:
chunk = sock.recv(_FRAME_HEADER_SIZE - len(header))
assert chunk, "Connection closed while reading frame header"
header += chunk
_version, _flags = struct.unpack(">BB", header[0:2])
stream = struct.unpack(">H", header[2:4])[0]
opcode = header[4]
length = struct.unpack(">I", header[5:9])[0]
body = b""
while len(body) < length:
chunk = sock.recv(length - len(body))
assert chunk, "Connection closed while reading frame body"
body += chunk
return stream, opcode, body
@dataclasses.dataclass
class ExecuteResult:
"""Parsed outcome of a ROWS EXECUTE response."""
metadata_changed: bool
no_metadata: bool
column_count: int
result_metadata_id: bytes | None
def _cql_connect(host: str, port: int, username: str, password: str) -> socket.socket:
"""
Open a raw TCP socket to *host*:*port* and perform the CQL v4 handshake,
negotiating the SCYLLA_USE_METADATA_ID extension so that result_metadata_id
is exchanged on the wire — identical to the mandatory CQL v5 behaviour.
"""
sock = socket.create_connection((host, port))
stream = 1
# STARTUP with SCYLLA_USE_METADATA_ID enables the v5-style metadata_id
# exchange for this v4 connection.
startup_opts = {"CQL_VERSION": _CQL_VERSION, "SCYLLA_USE_METADATA_ID": ""}
sock.sendall(_frame(_OP_STARTUP, _string_map(startup_opts), stream))
_, opcode, payload = _recv_frame(sock)
if opcode == _OP_READY:
return sock
assert opcode == _OP_AUTHENTICATE, (
f"Expected AUTHENTICATE(0x{_OP_AUTHENTICATE:02x}), got 0x{opcode:02x}"
)
# PlainText SASL token: NUL + username + NUL + password
creds = b"\x00" + username.encode() + b"\x00" + password.encode()
stream += 1
sock.sendall(_frame(_OP_AUTH_RESPONSE, _short_bytes(creds), stream))
_, auth_op, _ = _recv_frame(sock)
assert auth_op == _OP_AUTH_SUCCESS, f"Authentication failed: opcode=0x{auth_op:02x}"
return sock
def _cql_prepare(sock: socket.socket, stream: int, query: str) -> bytes:
"""PREPARE *query* and return the server-assigned query_id."""
sock.sendall(_frame(_OP_PREPARE, _long_string(query), stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_PREPARED, f"Expected PREPARED kind, got {kind}"
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
return bytes(payload[pos : pos + id_len])
def _cql_execute_with_metadata_id(
sock: socket.socket,
stream: int,
query_id: bytes,
result_metadata_id: bytes,
consistency: int = _DEFAULT_CONSISTENCY,
) -> ExecuteResult:
"""
Send EXECUTE carrying *result_metadata_id* on the wire.
With SCYLLA_USE_METADATA_ID active the server reads result_metadata_id
immediately after query_id (before the options block), mirroring CQL v5
wire format. SKIP_METADATA is set so a normal response returns no column
specs; only the METADATA_CHANGED promotion path returns actual metadata.
"""
# options block: [consistency: uint16][flags: byte]
options = struct.pack(">HB", consistency, _FLAG_SKIP_METADATA)
body = _short_bytes(query_id) + _short_bytes(result_metadata_id) + options
sock.sendall(_frame(_OP_EXECUTE, body, stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_ROWS, f"Expected ROWS kind, got {kind}"
meta_flags = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
column_count = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
metadata_changed = bool(meta_flags & _META_METADATA_CHANGED)
no_metadata = bool(meta_flags & _META_NO_METADATA)
response_metadata_id: bytes | None = None
if metadata_changed:
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
response_metadata_id = bytes(payload[pos : pos + id_len])
return ExecuteResult(
metadata_changed=metadata_changed,
no_metadata=no_metadata,
column_count=column_count,
result_metadata_id=response_metadata_id,
)
def _prepare_and_execute(
host: str, query: str, stale_metadata_id: bytes
) -> ExecuteResult:
"""
Open a raw socket connection (negotiating SCYLLA_USE_METADATA_ID), prepare
*query*, execute it with *stale_metadata_id*, and return the parsed result.
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
"""
sock = _cql_connect(host, 9042, "cassandra", "cassandra")
try:
stream = 1
stream += 1
query_id = _cql_prepare(sock, stream, query)
stream += 1
return _cql_execute_with_metadata_id(sock, stream, query_id, stale_metadata_id)
finally:
sock.close()
@pytest.mark.asyncio
async def test_list_roles_of_prepared_metadata_promotion(
manager: ManagerClient,
) -> None:
"""Verify that the server promotes the prepared metadata_id for statements
whose PREPARE response carries empty result metadata (NO_METADATA).
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
does not know the result set schema because the statement implementation
builds the metadata dynamically at execute time. The server therefore
returns the metadata_id of empty metadata in the PREPARE response.
When the client later sends EXECUTE with SKIP_METADATA and the stale
empty metadata_id, the server should detect the mismatch (the actual rows
have real metadata) and respond with a ``METADATA_CHANGED`` result that
carries the real metadata_id so the client can update its cache. This is
the behaviour mandated by CQL v5; on CQL v4 it is exercised via the
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
wire-level exchange.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
# Any non-empty bytes that differ from the real metadata_id serves as the
# "stale" cache entry the client would send after a PREPARE that returned
# empty metadata.
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
result = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}", stale_metadata_id
)
assert result.metadata_changed, (
f"expected EXECUTE for LIST ROLES OF {role} to return METADATA_CHANGED "
f"after PREPARE returned an empty result_metadata_id"
)
assert not result.no_metadata, (
f"expected EXECUTE for LIST ROLES OF {role} to not have NO_METADATA flag "
f"when METADATA_CHANGED is set"
)
assert result.result_metadata_id is not None, (
f"expected EXECUTE for LIST ROLES OF {role} to return a result_metadata_id "
f"alongside METADATA_CHANGED"
)
@pytest.mark.asyncio
@pytest.mark.skip_mode(
mode="release", reason="error injection is disabled in release mode"
)
async def test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection(
manager: ManagerClient,
) -> None:
"""Verify that the ``skip_rows_metadata_changed_response`` error injection
suppresses the metadata promotion, leaving the response with NO_METADATA
and without METADATA_CHANGED.
This is the negative/regression counterpart of
``test_list_roles_of_prepared_metadata_promotion``: it confirms that the
happy-path test is not a false positive by showing that the promotion can
be disabled, and that the injection point itself works correctly.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
async with inject_error(
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
):
async with inject_error(
manager.api, server.ip_addr, "skip_rows_metadata_changed_response"
):
result = await asyncio.to_thread(
_prepare_and_execute,
server.ip_addr,
f"LIST ROLES OF {role}",
stale_metadata_id,
)
assert not result.metadata_changed, (
f"expected injected EXECUTE for LIST ROLES OF {role} to suppress "
f"METADATA_CHANGED, but the flag was set"
)
assert result.no_metadata, (
f"expected injected EXECUTE for LIST ROLES OF {role} to keep the "
f"stale NO_METADATA path, but no_metadata flag was not set"
)

File diff suppressed because it is too large Load Diff

View File

@@ -44,7 +44,6 @@ run_in_dev:
- dtest/bypass_cache_test
- dtest/auth_roles_test
- dtest/audit_test
- audit/test_audit
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test

View File

@@ -177,7 +177,7 @@ async def _smoke_test(manager: ManagerClient, key_provider: KeyProviderFactory,
# restart the cluster
if restart:
await restart(manager, servers, cfs)
cql, _ = await manager.get_ready_cql(servers)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
else:
await manager.rolling_restart(servers)
for table_name in cfs:

View File

@@ -438,7 +438,6 @@ async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: Scylla
await wait_for(all_hosts_are_alive, deadline=time.time() + 60, period=0.1)
logger.info(f"Upgrading {s.server_id}")
await manager.server_change_version(s.server_id, scylla_binary)
await manager.server_sees_others(s.server_id, 2, interval=60.0)
logger.info("Done upgrading servers")

View File

@@ -8,10 +8,7 @@ import asyncio
import time
import pytest
import logging
from functools import partial
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.pylib.internal_types import ServerInfo
logger = logging.getLogger(__name__)
@@ -19,26 +16,6 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_crashed_node_substitution(manager: ManagerClient):
"""Test that a node which crashed after starting gossip but before joining group0
(an 'orphan' node) is eventually removed from gossip by the gossiper_orphan_remover_fiber.
The scenario:
1. Start 3 nodes with the 'fast_orphan_removal_fiber' injection enabled. This freezes
the gossiper_orphan_remover_fiber on each node before it enters its polling loop,
so it cannot remove any orphan until explicitly unblocked.
2. Start a 4th node with the 'crash_before_group0_join' injection enabled. This node
starts gossip normally but blocks inside pre_server_start(), just before sending
the join RPC to the topology coordinator. It never joins group0.
3. Wait until the 4th node's gossip state has fully propagated to all 3 running peers,
then trigger its crash via the injection. At this point all peers see it as an orphan:
present in gossip but absent from the group0 topology.
4. Assert the orphan is visible in gossip (live or down) on the surviving nodes.
5. Unblock the gossiper_orphan_remover_fiber on all 3 nodes (via message_injection) and
enable the 'speedup_orphan_removal' injection so the fiber removes the orphan immediately
without waiting for the normal 60-second age threshold.
6. Wait for the 'Finished to force remove node' log line confirming removal, then assert
the orphan is no longer present in gossip.
"""
servers = await manager.servers_add(3, config={
'error_injections_at_startup': ['fast_orphan_removal_fiber']
})
@@ -53,24 +30,10 @@ async def test_crashed_node_substitution(manager: ManagerClient):
log = await manager.server_open_log(failed_server.server_id)
await log.wait_for("finished do_send_ack2_msg")
failed_id = await manager.get_host_id(failed_server.server_id)
# Wait until the failed server's gossip state has propagated to all running peers.
# "finished do_send_ack2_msg" only guarantees that one peer completed a gossip round
# with the failed server; other nodes learn about it only in subsequent gossip rounds.
# Querying gossip before propagation completes would cause the assertion below to fail
# because the orphan node would not yet appear as live or down on every peer.
async def gossip_has_node(server: ServerInfo):
live = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
down = await manager.api.client.get_json("/gossiper/endpoint/down", host=server.ip_addr)
return True if failed_server.ip_addr in live + down else None
for s in servers:
await wait_for(partial(gossip_has_node, s), deadline=time.time() + 30)
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
await task
live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)
down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr)

View File

@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
from test.pylib.tablets import get_tablet_replicas
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import gather_safely, wait_for
from test.pylib.util import wait_for
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
logger = logging.getLogger(__name__)
@@ -51,42 +51,28 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
@pytest.mark.asyncio
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
node_count = 2
cmdline = ["--logger-log-level", "hints_manager=trace"]
servers = await manager.servers_add(node_count, cmdline=cmdline)
async def wait_for_hints_written(min_hint_count: int, timeout: int):
async def aux():
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
if hints_written >= min_hint_count:
return True
return None
assert await wait_for(aux, time.time() + timeout)
servers = await manager.servers_add(node_count)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
uses_tablets = await keyspace_has_tablets(manager, ks)
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
# Otherwise, it could happen that all mutations would target servers[0] only, which would
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
# distributed more or less uniformly!
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
await manager.server_stop_gracefully(servers[1].server_id)
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
await manager.server_stop_gracefully(servers[1].server_id)
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ANY
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
for i in range(100):
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
# Verify hints are written
await wait_for_hints_written(hints_before + 1, timeout=60)
# Verify hints are written
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
assert hints_after > hints_before
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
@pytest.mark.asyncio
async def test_limited_concurrency_of_writes(manager: ManagerClient):

View File

@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
repaired_keys = set(range(0, nr_keys))
unrepaired_keys = set()
@@ -164,7 +164,7 @@ async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
@pytest.mark.asyncio
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
await insert_keys(cql, ks, 0, 100)
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# Disable autocompaction
for server in servers:
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
nr_keys = 100
cmdline = ["--hinted-handoff-enabled", "0"]
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
await manager.server_stop_gracefully(servers[1].server_id)
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
nr_keys = 100
# Make sure no data commit log replay after force server stop
cmdline = ['--enable-commitlog', '0']
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
@pytest.mark.asyncio
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
token = -1
sstables_repaired_at = 0
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
@pytest.mark.asyncio
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
cmdline = ['--logger-log-level', 'repair=debug']
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)

View File

@@ -20,7 +20,6 @@ from cassandra.query import SimpleStatement
from test.pylib.async_cql import _wrap_future
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, TextType, Column
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name
from test.cluster.conftest import cluster_con
@@ -404,7 +403,6 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
for task in [*valid_keyspaces, *invalid_keyspaces]:
_ = tg.create_task(task)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
"""
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
@@ -466,50 +464,22 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
for rfs, tablets in valid_keyspaces:
_ = tg.create_task(create_keyspace(rfs, tablets))
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
running_servers = await manager.running_servers()
should_start = s1.server_id not in [server.server_id for server in running_servers]
if should_start:
await manager.server_start(s1.server_id)
ks = await create_keyspace(rfs, True)
# We need to wait for the new schema to propagate.
# Otherwise, it's not clear when the mutation
# corresponding to the created keyspace will
# arrive at server 1.
# It could happen only after the node performs
# the check upon start-up, effectively leading
# to a successful start-up, which we don't want.
# For more context, see issue: SCYLLADB-1137.
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
await manager.server_start(s1.server_id, expected_error=err)
_ = await manager.server_start(s1.server_id, expected_error=err)
await cql.run_async(f"DROP KEYSPACE {ks}")
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
# Test RF-rack-invalid keyspaces.
await try_fail([2, 0], "dc1", 2, 3)
await try_fail([3, 2], "dc2", 2, 1)
await try_fail([4, 1], "dc1", 4, 3)
# We need to perform a read barrier on the node to make
# sure that it processes the last DROP KEYSPACE.
# Otherwise, the node could think the RF-rack-invalid
# keyspace still exists.
await manager.server_start(s1.server_id)
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
await manager.server_start(s1.server_id)
_ = await manager.server_start(s1.server_id)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):

View File

@@ -23,25 +23,10 @@ from test.cluster.object_store.conftest import format_tuples
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
from test.cluster.util import new_test_keyspace
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name, wait_for
from test.pylib.util import unique_name
logger = logging.getLogger(__name__)
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
'''
Wait until the upload directory is empty with a timeout.
SSTable unlinking is asynchronous and in rare situations, it can happen
that not all sstables are deleted from the upload dir immediately after refresh is done.
'''
deadline = time.time() + timeout
async def check_empty():
files = os.listdir(upload_dir)
if not files:
return True
return None
await wait_for(check_empty, deadline, period=0.5)
class SSTablesOnLocalStorage:
def __init__(self):
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
@@ -168,8 +153,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
for s in servers:
cf_dir = dirs[s.server_id]["cf_dir"]
upload_dir = os.path.join(cf_dir, 'upload')
assert os.path.exists(upload_dir)
await wait_for_upload_dir_empty(upload_dir)
files = os.listdir(os.path.join(cf_dir, 'upload'))
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
shutil.rmtree(tmpbackup)

View File

@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
tombstone_mark = datetime.now(timezone.utc)
# test #2: the tombstones are not cleaned up when one node is down
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
await verify_tombstone_gc(tombstone_mark, timeout=5)
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
await verify_tombstone_gc(tombstone_mark, timeout=5)
await manager.remove_node(servers[0].server_id, down_server.server_id)

View File

@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
unpublished_generations = topo_res[0].unpublished_cdc_generations
return unpublished_generations is None or len(unpublished_generations) == 0 or None
await wait_for(all_generations_published, deadline=deadline)
await wait_for(all_generations_published, deadline=deadline, period=1.0)
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
@@ -470,17 +470,6 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
"""
Checks whether the given keyspace uses tablets.
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
"""
cql = manager.get_cql()
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
rows = list(rows_iter)
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]

View File

@@ -7,42 +7,54 @@
*/
#include "limiting_data_source.hh"
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include <cstdint>
using namespace seastar;
future<temporary_buffer<char>> limiting_data_source_impl::do_get() {
uint64_t size = std::min(_limit, _buf.size());
auto res = _buf.share(0, size);
_buf.trim_front(size);
return make_ready_future<temporary_buffer<char>>(std::move(res));
}
class limiting_data_source_impl final : public data_source_impl {
data_source _src;
size_t _limit;
temporary_buffer<char> _buf;
future<temporary_buffer<char>> do_get() {
uint64_t size = std::min(_limit, _buf.size());
auto res = _buf.share(0, size);
_buf.trim_front(size);
return make_ready_future<temporary_buffer<char>>(std::move(res));
}
public:
limiting_data_source_impl(data_source&& src, size_t limit)
: _src(std::move(src))
, _limit(limit)
{}
limiting_data_source_impl::limiting_data_source_impl(data_source&& src, size_t limit) : _src(std::move(src)), _limit(limit) {
}
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
future<temporary_buffer<char>> limiting_data_source_impl::get() {
if (_buf.empty()) {
virtual future<temporary_buffer<char>> get() override {
if (_buf.empty()) {
_buf.release();
return _src.get().then([this] (auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
return do_get();
}
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
if (n < _buf.size()) {
_buf.trim_front(n);
return do_get();
}
n -= _buf.size();
_buf.release();
return _src.get().then([this](auto&& buf) {
return _src.skip(n).then([this] (auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
return do_get();
}
future<temporary_buffer<char>> limiting_data_source_impl::skip(uint64_t n) {
if (n < _buf.size()) {
_buf.trim_front(n);
return do_get();
}
n -= _buf.size();
_buf.release();
return _src.skip(n).then([this](auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
};
data_source make_limiting_data_source(data_source&& src, size_t limit) {
return data_source{std::make_unique<limiting_data_source_impl>(std::move(src), limit)};

View File

@@ -8,25 +8,13 @@
#pragma once
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include <stddef.h>
namespace seastar {
class limiting_data_source_impl : public seastar::data_source_impl {
seastar::data_source _src;
size_t _limit;
seastar::temporary_buffer<char> _buf;
seastar::future<seastar::temporary_buffer<char>> do_get();
class data_source;
public:
limiting_data_source_impl(seastar::data_source&& src, size_t limit);
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
seastar::future<seastar::temporary_buffer<char>> get() override;
seastar::future<seastar::temporary_buffer<char>> skip(uint64_t n) override;
};
}
/// \brief Creates an data_source from another data_source but returns its data in chunks not bigger than a given limit
///

View File

@@ -271,21 +271,10 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
// arbitrary timeout of 120s for the server to make some output. Very generous.
// but since we (maybe) run docker, and might need to pull image, this can take
// some time if we're unlucky.
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
for (auto* f : {&f1, &f2}) {
if (f->failed()) {
try {
f->get();
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
if (!p) {
p = std::current_exception();
}
}
}
}
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
p = std::current_exception();
}

View File

@@ -60,7 +60,6 @@ class ManagerClient:
self.con_gen = con_gen
self.ccluster: Optional[CassandraCluster] = None
self.cql: Optional[CassandraSession] = None
self.exclusive_clusters: List[CassandraCluster] = []
# A client for communicating with ScyllaClusterManager (server)
self.sock_path = sock_path
self.client_for_asyncio_loop = {asyncio.get_running_loop(): UnixRESTClient(sock_path)}
@@ -114,9 +113,6 @@ class ManagerClient:
def driver_close(self) -> None:
"""Disconnect from cluster"""
for cluster in self.exclusive_clusters:
cluster.shutdown()
self.exclusive_clusters.clear()
if self.ccluster is not None:
logger.debug("shutting down driver")
safe_driver_shutdown(self.ccluster)
@@ -138,12 +134,9 @@ class ManagerClient:
hosts = await wait_for_cql_and_get_hosts(cql, servers, time() + 60)
return cql, hosts
async def get_cql_exclusive(self, server: ServerInfo, auth_provider: Optional[AuthProvider] = None):
cluster = self.con_gen([server.ip_addr], self.port, self.use_ssl,
auth_provider if auth_provider else self.auth_provider,
WhiteListRoundRobinPolicy([server.ip_addr]))
self.exclusive_clusters.append(cluster)
cql = cluster.connect()
async def get_cql_exclusive(self, server: ServerInfo):
cql = self.con_gen([server.ip_addr], self.port, self.use_ssl, self.auth_provider,
WhiteListRoundRobinPolicy([server.ip_addr])).connect()
await wait_for_cql_and_get_hosts(cql, [server], time() + 60)
return cql

View File

@@ -1394,11 +1394,7 @@ class ScyllaCluster:
f"the test must drop all keyspaces it creates.")
for server in itertools.chain(self.running.values(), self.stopped.values()):
server.write_log_marker(f"------ Ending test {name} ------\n")
# Only close log files when the cluster is dirty (will be destroyed).
# If the cluster is clean and will be reused, keep the log file open
# so that write_log_marker() and take_log_savepoint() work in the
# next test's before_test().
if self.is_dirty and not server.log_file.closed:
if not server.log_file.closed:
server.log_file.close()
async def server_stop(self, server_id: ServerNum, gracefully: bool) -> None:

View File

@@ -56,25 +56,15 @@ def unique_name(unique_name_prefix = 'test_'):
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float,
period: float = 0.1,
period: float = 1,
before_retry: Optional[Callable[[], Any]] = None,
backoff_factor: float = 1.5,
max_period: float = 1.0,
label: Optional[str] = None) -> T:
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
backoff_factor: float = 1,
max_period: float = None) -> T:
while True:
elapsed = time.time() - start
assert time.time() < deadline, \
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
assert(time.time() < deadline), "Deadline exceeded, failing test."
res = await pred()
if res is not None:
if retries > 0:
logger.debug(f"wait_for({tag}) completed "
f"in {elapsed:.2f}s ({retries} retries)")
return res
retries += 1
await asyncio.sleep(period)
period *= backoff_factor
if max_period is not None:
@@ -283,14 +273,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
async def view_is_built():
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):

View File

@@ -4,7 +4,7 @@
"""
GDB helper functions for `scylla_gdb` tests.
They should be loaded to GDB by "-x {dir}/gdb_utils.py}",
when loaded, they can be run in gdb e.g. `$get_sstables()`
when loaded, they can be run in gdb e.g. `python get_sstables()`
Depends on helper functions injected to GDB by `scylla-gdb.py` script.
(sharded, for_each_table, seastar_lw_shared_ptr, find_sstables, find_vptrs, resolve,
@@ -15,65 +15,39 @@ import gdb
import uuid
class get_schema(gdb.Function):
"""Finds and returns a schema pointer."""
def __init__(self):
super(get_schema, self).__init__('get_schema')
def invoke(self):
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(for_each_table(db))
return seastar_lw_shared_ptr(table['_schema']).get()
def get_schema():
"""Execute GDB commands to get schema information."""
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(for_each_table(db))
ptr = seastar_lw_shared_ptr(table['_schema']).get()
print('schema=', ptr)
class get_sstable(gdb.Function):
"""Finds and returns an sstable pointer."""
def __init__(self):
super(get_sstable, self).__init__('get_sstable')
def invoke(self):
return next(find_sstables())
def get_sstables():
"""Execute GDB commands to get sstables information."""
sst = next(find_sstables())
print(f"sst=(sstables::sstable *)", sst)
class get_task(gdb.Function):
def get_task():
"""
Finds and returns a Scylla fiber task.
Some commands need a task to work on. The following fixture finds one.
Because we stopped Scylla while it was idle, we don't expect to find
any ready task with get_local_tasks(), but we can find one with a
find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
to it for "scylla fiber") is one from http_server::do_accept_one.
"""
def __init__(self):
super(get_task, self).__init__('get_task')
def invoke(self):
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
print(f"task={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
break
class get_coroutine(gdb.Function):
"""
Finds and returns a coroutine frame.
Prints COROUTINE_NOT_FOUND if the coroutine is not present.
"""
def __init__(self):
super(get_coroutine, self).__init__('get_coroutine')
def invoke(self):
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and name.strip() == target:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
print("COROUTINE_NOT_FOUND")
# Register the functions in GDB
get_schema()
get_sstable()
get_task()
get_coroutine()
def get_coroutine():
"""Similar to get_task(), but looks for a coroutine frame."""
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and name.strip() == target:
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")

View File

@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
@@ -22,6 +23,20 @@ pytestmark = [
),
]
@pytest.fixture(scope="module")
def schema(gdb_cmd):
"""
Returns pointer to schema of the first table it finds
Even without any user tables, we will always have system tables.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_schema()").stdout
match = re.search(r"schema=\s*(0x[0-9a-fA-F]+)", result)
schema_pointer = match.group(1) if match else None
return schema_pointer
@pytest.mark.parametrize(
"command",
[
@@ -30,17 +45,21 @@ pytestmark = [
"schema (const schema *)", # `schema` requires type-casted pointer
],
)
def test_schema(gdb_cmd, command):
result = execute_gdb_command(gdb_cmd, f"{command} $get_schema()")
def test_schema(gdb_cmd, command, schema):
assert schema, "Failed to find schema of any table"
result = execute_gdb_command(gdb_cmd, f"{command} {schema}")
assert result.returncode == 0, (
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
)
def test_generate_object_graph(gdb_cmd, request):
def test_generate_object_graph(gdb_cmd, schema, request):
assert schema, "Failed to find schema of any table"
tmpdir = request.config.getoption("--tmpdir")
result = execute_gdb_command(
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 $get_schema()"
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}"
)
assert result.returncode == 0, (
f"GDB command `generate-object-graph` failed. stdout: {result.stdout} stderr: {result.stderr}"

View File

@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
@@ -23,6 +24,16 @@ pytestmark = [
]
@pytest.fixture(scope="module")
def sstable(gdb_cmd):
"""Finds sstable"""
result = execute_gdb_command(gdb_cmd, full_command="python get_sstables()").stdout
match = re.search(r"(\(sstables::sstable \*\) 0x)([0-9a-f]+)", result)
sstable_pointer = match.group(0).strip() if match else None
return sstable_pointer
@pytest.mark.parametrize(
"command",
[
@@ -30,8 +41,10 @@ pytestmark = [
"sstable-index-cache",
],
)
def test_sstable(gdb_cmd, command):
result = execute_gdb_command(gdb_cmd, f"{command} $get_sstable()")
def test_sstable(gdb_cmd, command, sstable):
assert sstable, "No sstable was found"
result = execute_gdb_command(gdb_cmd, f"{command} {sstable}")
assert result.returncode == 0, (
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
)

View File

@@ -6,6 +6,8 @@ Tests for commands, that need a some task to work on.
Each only checks that the command does not fail - but not what it does or returns.
"""
import re
import pytest
from test.scylla_gdb.conftest import execute_gdb_command
@@ -23,25 +25,59 @@ pytestmark = [
]
def test_coroutine_frame(gdb_cmd):
@pytest.fixture(scope="module")
def task(gdb_cmd):
"""
Finds a Scylla fiber task using a `find_vptrs()` loop.
Since Scylla is freshbooted, `get_local_tasks()` returns nothing.
Nevertheless, a `find_vptrs()` scan can still discover the first task
skeleton created by `http_server::do_accept_one` (often the earliest
“Scylla fiber” to appear).
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_task()").stdout
match = re.search(r"task=(\d+)", result)
task = match.group(1) if match else None
return task
@pytest.fixture(scope="module")
def coroutine_task(gdb_cmd, scylla_server):
"""
Finds a coroutine task, similar to the `task` fixture.
This fixture executes the `coroutine_config` script in GDB to locate a
specific coroutine task.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_coroutine()").stdout
match = re.search(r"coroutine_config=\s*(.*)", result)
if not match:
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
return match.group(1).strip()
def test_coroutine_frame(gdb_cmd, coroutine_task):
"""
Offsets the pointer by two words to shift from the outer coroutine frame
to the inner `seastar::task`, as required by `$coro_frame`, which expects
a `seastar::task*`.
"""
assert coroutine_task, "No coroutine task was found"
result = execute_gdb_command(
gdb_cmd, full_command="p *$coro_frame($get_coroutine() + 16)"
gdb_cmd, full_command=f"p *$coro_frame({coroutine_task} + 16)"
)
if "COROUTINE_NOT_FOUND" in result.stdout:
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
assert result.returncode == 0, (
f"GDB command `coro_frame` failed. stdout: {result.stdout} stderr: {result.stderr}"
)
def test_fiber(gdb_cmd):
result = execute_gdb_command(gdb_cmd, "fiber $get_task()")
def test_fiber(gdb_cmd, task):
assert task, f"No task was found using `find_vptrs()`"
result = execute_gdb_command(gdb_cmd, f"fiber {task}")
assert result.returncode == 0, (
f"GDB command `fiber` failed. stdout: {result.stdout} stderr: {result.stderr}"
)

View File

@@ -69,7 +69,6 @@
#include "message/messaging_service.hh"
#include "idl/forward_cql.dist.hh"
#include "utils/bit_cast.hh"
#include "utils/error_injection.hh"
#include "utils/labels.hh"
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
@@ -1634,26 +1633,13 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
}
tracing::trace(trace_state, "Processing a statement");
auto cache_key_for_metadata = cache_key;
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, prepared, std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id), &qp, cache_key = std::move(cache_key_for_metadata), prepared = std::move(prepared)] (auto msg) mutable {
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
if (msg->move_to_shard()) {
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
} else {
if (prepared->result_metadata_is_empty()
&& metadata_id.has_request_metadata_id()
&& !utils::get_local_injector().enter("skip_prepared_result_metadata_promotion")) {
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
auto rows_metadata_id = rows->rs().get_metadata().calculate_metadata_id();
clogger.debug("prepared result metadata promotion: request_metadata_id_present={}, calculated_rows_metadata_id_size={}",
metadata_id.has_request_metadata_id(), rows_metadata_id._metadata_id.size());
qp.local().update_prepared_result_metadata_id(cache_key, rows_metadata_id);
auto request_metadata_id = metadata_id.get_request_metadata_id();
metadata_id = cql_metadata_id_wrapper(std::move(request_metadata_id), std::move(rows_metadata_id));
}
}
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
}
@@ -2521,16 +2507,9 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
no_metadata, skip_rows_metadata_changed_response);
if (!skip_rows_metadata_changed_response) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
}

View File

@@ -10,7 +10,6 @@
#include <seastar/core/seastar.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/closeable.hh>
#include "init.hh"
#include "supervisor.hh"
#include "directories.hh"