Compare commits

..

3 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
ae0208e35c Use permissions: {} (no token permissions) instead of contents: read
Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
Agent-Logs-Url: https://github.com/scylladb/scylladb/sessions/11b4f0e2-dd65-47f3-9d02-0c01e28fcd99
2026-03-23 16:59:37 +00:00
copilot-swe-agent[bot]
b418e7a489 Initial plan 2026-03-23 16:58:39 +00:00
Yaniv Kaul
2c5727753a .github/workflows/trigger-scylla-ci.yaml:3: Potential fix for code scanning alert no. 184: Workflow does not contain permissions
Reduce permissions to 'read'.

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2026-03-23 16:46:06 +02:00
61 changed files with 759 additions and 1669 deletions

View File

@@ -1,6 +1,6 @@
name: Trigger Scylla CI Route
permissions:
contents: read
permissions: {}
on:
issue_comment:

View File

@@ -1,8 +1,5 @@
name: Trigger next gating
permissions:
contents: read
on:
push:
branches:

View File

@@ -699,17 +699,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// for such a size.
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
}
// Check the concurrency limit early, before acquiring memory and
// reading the request body, to avoid piling up memory from excess
// requests that will be rejected anyway. This mirrors the CQL
// transport which also checks concurrency before memory acquisition
// (transport/server.cc).
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
// JSON parsing can allocate up to roughly 2x the size of the raw
// document, + a couple of bytes for maintenance.
// If the Content-Length of the request is not available, we assume
@@ -771,6 +760,12 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
_executor._stats.unsupported_operations++;
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
}
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
executor::client_state client_state(service::client_state::external_tag(),
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
if (!username.empty()) {

View File

@@ -583,7 +583,8 @@ sstable_format: ms
audit: "table"
#
# List of statement categories that should be audited.
audit_categories: "DCL,DDL,AUTH,ADMIN"
# Possible categories are: QUERY, DML, DCL, DDL, AUTH, ADMIN
audit_categories: "DCL,AUTH,ADMIN"
#
# List of tables that should be audited.
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"

View File

@@ -1582,7 +1582,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"\tnone : No auditing enabled.\n"
"\tsyslog : Audit messages sent to Syslog.\n"
"\ttable : Audit messages written to column family named audit.audit_log.\n")
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,DDL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
, audit_tables(this, "audit_tables", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of table names (<keyspace>.<table>) that will be audited.")
, audit_keyspaces(this, "audit_keyspaces", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited")
, audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writing to syslog. Only applicable when audit is set to syslog.")

View File

@@ -9,7 +9,6 @@
import os
import sys
import shlex
import argparse
import psutil
from pathlib import Path
@@ -104,41 +103,16 @@ if __name__ == '__main__':
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
swapfile.chmod(0o600)
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
mount_point = find_mount_point(swap_directory)
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
# Add DefaultDependencies=no to the swap unit to avoid getting the default
# Before=swap.target dependency. We apply this to all clouds, but the
# requirement came from Azure:
#
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
# the network (After=network-online.target). By extension, this means that
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
# then the swap unit would be part of the swap.target which other services
# assume to be a local boot target, so we would end up with dependency cycles
# such as:
#
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
#
# By removing the automatic Before=swap.target, the swap unit is no longer
# part of swap.target, avoiding such cycles. The swap will still be
# activated via WantedBy=multi-user.target.
unit_data = '''
[Unit]
Description=swapfile
DefaultDependencies=no
After={}
Conflicts=umount.target
Before=umount.target
[Swap]
What={}
[Install]
WantedBy=multi-user.target
'''[1:-1].format(mount_unit, swapfile)
'''[1:-1].format(swapfile)
with swapunit.open('w') as f:
f.write(unit_data)
systemd_unit.reload()

View File

@@ -727,12 +727,7 @@ public:
// now we need one page more to be able to save one for next lap
auto fill_size = align_up(buf1.size(), block_size) + block_size - buf1.size();
// If the underlying stream is already at EOF (e.g. buf1 came from
// cached _next while the previous read_exactly drained the source),
// skip the read_exactly call — it would return empty anyway.
auto buf2 = _input.eof()
? temporary_buffer<char>()
: co_await _input.read_exactly(fill_size);
auto buf2 = co_await _input.read_exactly(fill_size);
temporary_buffer<char> output(buf1.size() + buf2.size());

View File

@@ -42,14 +42,7 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
const auto replication_factor = erm.get_replication_factor();
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
if (read_replicas.size() > replication_factor + 1) {
return seastar::format(
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
read_replicas.size(), replication_factor);
}
} else if (read_replicas.size() > replication_factor) {
if (read_replicas.size() > replication_factor) {
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
}
return {};

View File

@@ -15,7 +15,6 @@ from typing import Any, Optional
import asyncio
import contextlib
import glob
import hashlib
import json
import logging
import os
@@ -365,14 +364,12 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
llvm_profile_file = f"{addr}-%m.profraw"
scylla_workdir = f"{addr}"
logfile = f"{addr}.log"
socket = maintenance_socket_path(cluster_workdir, addr)
command = [
"env",
f"LLVM_PROFILE_FILE={llvm_profile_file}",
f"SCYLLA_HOME={os.path.realpath(os.getcwd())}", # We assume that the script has Scylla's `conf/` as its filesystem neighbour.
os.path.realpath(executable),
f"--workdir={scylla_workdir}",
f"--maintenance-socket={socket}",
"--ring-delay-ms=0",
"--developer-mode=yes",
"--memory=1G",
@@ -394,7 +391,6 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
f"--authenticator=PasswordAuthenticator",
f"--authorizer=CassandraAuthorizer",
] + list(extra_opts)
training_logger.info(f"Using maintenance socket {socket}")
return await run(['bash', '-c', fr"""exec {shlex.join(command)} >{q(logfile)} 2>&1"""], cwd=cluster_workdir)
async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optional[list[str]], workdir: PathLike, cluster_name: str, extra_opts: list[str]) -> list[Process]:
@@ -437,25 +433,16 @@ async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optiona
procs.append(proc)
await wait_for_node(proc, addrs[i], timeout)
except:
await stop_cluster(procs, addrs, cluster_workdir=workdir)
await stop_cluster(procs, addrs)
raise
return procs
async def stop_cluster(procs: list[Process], addrs: list[str], cluster_workdir: PathLike) -> None:
async def stop_cluster(procs: list[Process], addrs: list[str]) -> None:
"""Stops a Scylla cluster started with start_cluster().
Doesn't return until all nodes exit, even if stop_cluster() is cancelled.
"""
await clean_gather(*[cancel_process(p, timeout=60) for p in procs])
_cleanup_short_sockets(cluster_workdir, addrs)
def _cleanup_short_sockets(cluster_workdir: PathLike, addrs: list[str]) -> None:
"""Remove short maintenance socket files created in /tmp."""
for addr in addrs:
try:
os.unlink(maintenance_socket_path(cluster_workdir, addr))
except OSError:
pass
async def wait_for_port(addr: str, port: int) -> None:
await bash(fr'until printf "" >>/dev/tcp/{addr}/{port}; do sleep 0.1; done 2>/dev/null')
@@ -466,17 +453,12 @@ async def merge_profraw(directory: PathLike) -> None:
await bash(fr"llvm-profdata merge {q(directory)}/*.profraw -output {q(directory)}/prof.profdata")
def maintenance_socket_path(cluster_workdir: PathLike, addr: str) -> str:
"""Return the maintenance socket path for a node.
"""Returns the absolute path of the maintenance socket for a given node.
Returns a short deterministic path in /tmp (derived from an MD5 hash of
the natural ``<cluster_workdir>/<addr>/cql.m`` path) to stay within the
Unix domain socket length limit.
The same path is passed to Scylla via ``--maintenance-socket`` in
``start_node()``.
With ``maintenance_socket: workdir`` in scylla.yaml the socket lives at
``<node-workdir>/cql.m``, i.e. ``<cluster_workdir>/<addr>/cql.m``.
"""
natural = os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
path_hash = hashlib.md5(natural.encode()).hexdigest()[:12]
return os.path.join(tempfile.gettempdir(), f'pgo-{path_hash}.m')
return os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
async def setup_cassandra_user(workdir: PathLike, addr: str) -> None:
"""Create the ``cassandra`` superuser via the maintenance socket.
@@ -543,7 +525,7 @@ async def with_cluster(executable: PathLike, workdir: PathLike, cpusets: Optiona
yield addrs, procs
finally:
training_logger.info(f"Stopping the cluster in {workdir}")
await stop_cluster(procs, addrs, cluster_workdir=workdir)
await stop_cluster(procs, addrs)
training_logger.info(f"Stopped the cluster in {workdir}")
################################################################################

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:54662978b9ce4a6e25790b1b0a5099e6063173ffa95a399a6287cf474376ed09
size 6595952
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
size 6505524

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:0cf44ea1fb2ae20de45d26fe8095054e60cb8700cddcb2fd79ef79705484b18a
size 6603780
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
size 6502668

View File

@@ -1098,8 +1098,7 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
// Make sure that only a leader or a node that is part of the config can request read barrier
// Nodes outside of the config may never get the data, so they will not be able to read it.
follower_progress* opt_progress = leader_state().tracker.find(requester);
if (requester != _my_id && opt_progress == nullptr) {
if (requester != _my_id && leader_state().tracker.find(requester) == nullptr) {
throw std::runtime_error(fmt::format("Read barrier requested by a node outside of the configuration {}", requester));
}
@@ -1110,23 +1109,19 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
return {};
}
// Optimization for read barriers requested on non-voters. A non-voter doesn't receive the read_quorum message, so
// it might update its commit index only after another leader tick, which would slow down wait_for_apply() at the
// end of the read barrier. Prevent that by replicating to the non-voting requester here.
if (requester != _my_id && opt_progress->commit_idx < _commit_idx && opt_progress->match_idx == _log.last_idx()
&& !opt_progress->can_vote) {
logger.trace("start_read_barrier[{}]: replicate to {} because follower commit_idx={} < commit_idx={}, "
"follower match_idx={} == last_idx={}, and follower can_vote={}",
_my_id, requester, opt_progress->commit_idx, _commit_idx, opt_progress->match_idx,
_log.last_idx(), opt_progress->can_vote);
replicate_to(*opt_progress, true);
}
read_id id = next_read_id();
logger.trace("start_read_barrier[{}] starting read barrier with id {}", _my_id, id);
return std::make_pair(id, _commit_idx);
}
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
// read_idx from the leader might not be replicated to the local node yet.
const bool in_local_log = read_idx <= _log.last_idx();
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
advance_commit_idx(read_idx);
}
}
void fsm::stop() {
if (is_leader()) {
// Become follower to stop accepting requests

View File

@@ -480,6 +480,15 @@ public:
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
// read index belongs to the current term.
//
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
// Completeness Property). Hence, updating the commit index is safe.
void maybe_update_commit_idx_for_read(index_t read_idx);
size_t in_memory_log_size() const {
return _log.in_memory_size();
}

View File

@@ -1109,18 +1109,6 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
// case.
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
_stats.store_term_and_vote++;
// When the term advances, any in-flight snapshot transfers
// belong to an outdated term: the progress tracker has been
// reset in become_leader() or we are now a follower.
// Abort them before we dispatch this batch's messages, which
// may start fresh transfers for the new term.
//
// A vote may also change independently of the term (e.g. a
// follower voting for a candidate at the same term), but in
// that case there are no in-flight transfers and the abort
// is a no-op.
abort_snapshot_transfers();
}
if (batch.snp) {
@@ -1230,6 +1218,8 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
// quickly) stop happening (we're outside the config after all).
co_await _apply_entries.push_eventually(removed_from_config{});
}
// request aborts of snapshot transfers
abort_snapshot_transfers();
// abort all read barriers
for (auto& r : _reads) {
r.promise.set_value(not_a_leader{_fsm->current_leader()});
@@ -1571,6 +1561,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
co_return stop_iteration::no;
}
read_idx = std::get<index_t>(res);
_fsm->maybe_update_commit_idx_for_read(read_idx);
co_return stop_iteration::yes;
});

View File

@@ -1622,14 +1622,14 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
size_t next_file_id = 0;
for (auto file_id : found_file_ids) {
if (file_id != next_file_id) {
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id).string()));
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id)));
}
next_file_id++;
}
// populate index from all segments. keep the latest record for each key.
for (auto file_id : found_file_ids) {
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size());
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id), (file_id + 1) * 100 / found_file_ids.size());
co_await max_concurrent_for_each(segments_in_file(file_id), 32,
[this, &db] (log_segment_id seg_id) {
return recover_segment(db, seg_id);

View File

@@ -4860,14 +4860,13 @@ table::query(schema_ptr query_schema,
}
std::optional<full_position> last_pos;
if (querier_opt) {
if (querier_opt->current_position()) {
last_pos.emplace(*querier_opt->current_position());
}
if (!saved_querier || (!querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
co_await querier_opt->close();
querier_opt = {};
}
if (querier_opt && querier_opt->current_position()) {
last_pos.emplace(*querier_opt->current_position());
}
if (!saved_querier || (querier_opt && !querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
co_await querier_opt->close();
querier_opt = {};
}
if (saved_querier) {
*saved_querier = std::move(querier_opt);

View File

@@ -87,11 +87,6 @@ target_include_directories(wasmtime_bindings
target_link_libraries(wasmtime_bindings
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
# this PCH must have matching compile options, otherwise the compiler rejects
# the PCH due to flag mismatch (e.g., -fsanitize=address).
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
endif()
@@ -113,6 +108,5 @@ target_include_directories(inc
target_link_libraries(inc
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_link_libraries(inc PRIVATE Seastar::seastar)
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
endif()

View File

@@ -538,7 +538,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
group0_id = g0_info.group0_id;
raft::server_address my_addr{my_id, {}};
bool starting_server_as_follower = false;
if (server == nullptr) {
// This is the first time discovery is run. Create and start a Raft server for group 0 on this node.
raft::configuration initial_configuration;
@@ -566,7 +565,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
// trigger an empty snapshot transfer.
nontrivial_snapshot = true;
} else {
starting_server_as_follower = true;
co_await handshaker->pre_server_start(g0_info);
}
@@ -593,9 +591,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
}
SCYLLA_ASSERT(server);
co_await utils::get_local_injector().inject("join_group0_pause_before_config_check",
utils::wait_for_message(std::chrono::minutes{5}));
if (!starting_server_as_follower && server->get_configuration().contains(my_id)) {
if (server->get_configuration().contains(my_id)) {
// True if we started a new group or completed a configuration change initiated earlier.
group0_log.info("server {} already in group 0 (id {}) as {}", my_id, group0_id,
server->get_configuration().can_vote(my_id)? "voter" : "non-voter");

View File

@@ -910,7 +910,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
} else {
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
frozen_muts_to_apply.push_back(co_await freeze_gently(m));
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
});
}
}

View File

@@ -239,9 +239,11 @@ public:
// The i-th element corresponds to the i-th entry in _entries.
// Can be smaller than _entries. If _entries[i] doesn't have a matching element in _promoted_indexes then
// that entry doesn't have a promoted index.
// It's not chunked, because promoted index is present only when there are large partitions in the page,
// which also means the page will have typically only 1 entry due to summary:data_file size ratio.
// Kept separately to avoid paying for storage cost in pages where no entry has a promoted index,
// which is typical in workloads with small partitions.
lsa::chunked_managed_vector<promoted_index> _promoted_indexes;
managed_vector<promoted_index> _promoted_indexes;
public:
partition_index_page() = default;
partition_index_page(partition_index_page&&) noexcept = default;

75
test.py
View File

@@ -11,7 +11,6 @@ from __future__ import annotations
import argparse
import asyncio
import math
import shlex
import textwrap
from random import randint
@@ -74,51 +73,6 @@ PYTEST_RUNNER_DIRECTORIES = [
launch_time = time.monotonic()
class ThreadsCalculator:
"""
The ThreadsCalculator class calculates the number of jobs that can be run concurrently based on system
memory and CPU constraints. It allows resource reservation and configurable parameters for
flexible job scheduling in various modes, such as `debug`.
"""
def __init__(self,
modes: list[str],
min_system_memory_reserve: float = 5e9,
max_system_memory_reserve: float = 8e9,
system_memory_reserve_fraction = 16,
max_test_memory: float = 5e9,
test_memory_fraction: float = 8.0,
debug_test_memory_multiplier: float = 1.5,
debug_cpus_per_test_job=1.5,
non_debug_cpus_per_test_job: float =1.0,
non_debug_max_test_memory: float = 4e9
):
sys_mem = int(os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES"))
test_mem = min(sys_mem / test_memory_fraction, max_test_memory)
if "debug" in modes:
test_mem *= debug_test_memory_multiplier
system_memory_reserve = int(min(
max(sys_mem / system_memory_reserve_fraction, min_system_memory_reserve),
max_system_memory_reserve,
))
available_mem = max(0, sys_mem - system_memory_reserve)
is_debug = "debug" in modes
test_mem = min(
sys_mem / test_memory_fraction,
max_test_memory if is_debug else non_debug_max_test_memory,
)
if is_debug:
test_mem *= debug_test_memory_multiplier
self.cpus_per_test_job = (
debug_cpus_per_test_job if is_debug else non_debug_cpus_per_test_job
)
self.default_num_jobs_mem = max(1, int(available_mem // test_mem))
def get_number_of_threads(self, nr_cpus: int) -> int:
default_num_jobs_cpu = max(1, math.ceil(nr_cpus / self.cpus_per_test_job))
return min(self.default_num_jobs_mem, default_num_jobs_cpu)
class TabularConsoleOutput:
"""Print test progress to the console"""
@@ -227,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
help="Run only tests for given build mode(s)")
parser.add_argument('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution")
parser.add_argument('--timeout', action="store", default="3600", type=int,
parser.add_argument('--timeout', action="store", default="24000", type=int,
help="timeout value for single test execution")
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
help="timeout value for test.py/pytest session execution")
@@ -319,13 +273,6 @@ def parse_cmd_line() -> argparse.Namespace:
if args.skip_patterns and args.k:
parser.error(palette.fail('arguments --skip and -k are mutually exclusive, please use only one of them'))
if not args.modes:
try:
args.modes = get_configured_modes()
except Exception:
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
raise
if not args.jobs:
if not args.cpus:
nr_cpus = multiprocessing.cpu_count()
@@ -333,7 +280,19 @@ def parse_cmd_line() -> argparse.Namespace:
nr_cpus = int(subprocess.check_output(
['taskset', '-c', args.cpus, 'python3', '-c',
'import os; print(len(os.sched_getaffinity(0)))']))
args.jobs = ThreadsCalculator(args.modes).get_number_of_threads(nr_cpus)
cpus_per_test_job = 1
sysmem = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
testmem = 6e9 if os.sysconf('SC_PAGE_SIZE') > 4096 else 2e9
default_num_jobs_mem = ((sysmem - 4e9) // testmem)
args.jobs = min(default_num_jobs_mem, nr_cpus // cpus_per_test_job)
if not args.modes:
try:
args.modes = get_configured_modes()
except Exception:
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
raise
if not args.coverage_modes and args.coverage:
args.coverage_modes = list(args.modes)
@@ -391,12 +350,16 @@ def run_pytest(options: argparse.Namespace) -> tuple[int, list[SimpleNamespace]]
if options.list_tests:
args.extend(['--collect-only', '--quiet', '--no-header'])
else:
threads = int(options.jobs)
# debug mode is very CPU and memory hungry, so we need to lower the number of threads to be able to finish tests
if 'debug' in options.modes:
threads = int(threads * 0.5)
args.extend([
"--log-level=DEBUG", # Capture logs
f'--junit-xml={junit_output_file}',
"-rf",
'--test-py-init',
f'-n{options.jobs}',
f'-n{threads}',
f'--tmpdir={temp_dir}',
f'--maxfail={options.max_failures}',
f'--alluredir={report_dir / f"allure_{HOST_ID}"}',

View File

@@ -18,7 +18,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/manual_clock.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/later.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/util/alloc_failure_injector.hh>
@@ -290,17 +290,12 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
m.set_expiring(id1);
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
m.barrier().get();
promise<> shard0_timer_expired;
timer<manual_clock> shard0_timer([&shard0_timer_expired] {
shard0_timer_expired.set_value();
});
shard0_timer.arm(manual_clock::now() + expiration_time);
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
manual_clock::advance(expiration_time);
BOOST_CHECK(!m.find(id1));
return smp::submit_to(0, []{}); // Ensure shard 0 notices timer is expired.
}).get();
shard0_timer_expired.get_future().get();
BOOST_CHECK(!m.find(id1));
// Expiring entries are replicated

View File

@@ -23,11 +23,8 @@
#include "test/lib/tmpdir.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/exception_utils.hh"
#include "test/lib/limiting_data_source.hh"
#include "utils/io-wrappers.hh"
#include <seastar/util/memory-data-source.hh>
using namespace encryption;
static tmpdir dir;
@@ -598,113 +595,6 @@ SEASTAR_TEST_CASE(test_encrypted_data_source_simple) {
co_await test_random_data_source(sizes);
}
// Reproduces the production deadlock where encrypted SSTable component downloads
// got stuck during restore. The encrypted_data_source::get() caches a block in
// _next, then on the next call bypasses input_stream::read()'s _eof check and
// calls input_stream::read_exactly() — which does NOT check _eof when _buf is
// empty. This causes a second get() on the underlying source after EOS.
//
// In production the underlying source was chunked_download_source whose get()
// hung forever. Here we simulate it with a strict source that fails the test.
//
// The fix belongs in seastar's input_stream::read_exactly(): check _eof before
// calling _fd.get(), consistent with read(), read_up_to(), and consume().
static future<> test_encrypted_source_copy(size_t plaintext_size) {
testlog.info("test_encrypted_source_copy: plaintext_size={}", plaintext_size);
key_info info{"AES/CBC", 256};
auto k = ::make_shared<symmetric_key>(info);
// Step 1: Encrypt the plaintext into memory buffers
auto plaintext = generate_random<char>(plaintext_size);
std::vector<temporary_buffer<char>> encrypted_bufs;
{
data_sink sink(make_encrypted_sink(create_memory_sink(encrypted_bufs), k));
co_await sink.put(plaintext.clone());
co_await sink.close();
}
// Flatten encrypted buffers into a single contiguous buffer
size_t encrypted_total = 0;
for (const auto& b : encrypted_bufs) {
encrypted_total += b.size();
}
temporary_buffer<char> encrypted(encrypted_total);
size_t pos = 0;
for (const auto& b : encrypted_bufs) {
std::copy(b.begin(), b.end(), encrypted.get_write() + pos);
pos += b.size();
}
// Step 2: Create a data source from the encrypted data that fails on
// post-EOS get() — simulating a source like chunked_download_source
// that would hang forever in this situation.
class strict_memory_source final : public limiting_data_source_impl {
bool _eof = false;
public:
strict_memory_source(temporary_buffer<char> data, size_t chunk_size)
: limiting_data_source_impl(
data_source(std::make_unique<util::temporary_buffer_data_source>(std::move(data))),
chunk_size) {}
future<temporary_buffer<char>> get() override {
BOOST_REQUIRE_MESSAGE(!_eof,
"get() called on source after it already returned EOS — "
"this is the production deadlock: read_exactly() does not "
"check _eof before calling _fd.get()");
auto buf = co_await limiting_data_source_impl::get();
_eof = buf.empty();
co_return buf;
}
};
// Step 3: Wrap in encrypted_data_source and drain via consume() —
// the exact code path used by seastar::copy() which is what
// sstables_loader_helpers::download_sstable() calls.
// Try multiple chunk sizes to hit different alignment scenarios.
for (size_t chunk_size : {1ul, 7ul, 4096ul, 8192ul, encrypted_total, encrypted_total + 1}) {
if (chunk_size == 0) continue;
auto src = data_source(make_encrypted_source(
data_source(std::make_unique<strict_memory_source>(encrypted.clone(), chunk_size)), k));
auto in = input_stream<char>(std::move(src));
// consume() is what seastar::copy() uses internally. It calls
// encrypted_data_source::get() via _fd.get() until EOF.
size_t total_decrypted = 0;
co_await in.consume([&total_decrypted](temporary_buffer<char> buf) {
total_decrypted += buf.size();
return make_ready_future<consumption_result<char>>(continue_consuming{});
});
co_await in.close();
BOOST_REQUIRE_EQUAL(total_decrypted, plaintext_size);
}
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_8k) {
co_await test_encrypted_source_copy(8192);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_4k) {
co_await test_encrypted_source_copy(4096);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_small) {
co_await test_encrypted_source_copy(100);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_12k) {
co_await test_encrypted_source_copy(12288);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_unaligned) {
co_await test_encrypted_source_copy(8193);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_1byte) {
co_await test_encrypted_source_copy(1);
}
SEASTAR_TEST_CASE(test_encrypted_data_source_fuzzy) {
std::mt19937_64 rand_gen(std::random_device{}());

View File

@@ -1045,6 +1045,7 @@ validate_result_size(size_t i, schema_ptr schema, const utils::chunked_vector<mu
struct fuzzy_test_config {
uint32_t seed;
std::chrono::seconds timeout;
unsigned concurrency;
unsigned scans;
};
@@ -1076,9 +1077,6 @@ run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, sharded<replica::database>&
testlog.debug("[scan#{}]: seed={}, is_stateful={}, prange={}, ckranges={}", i, seed, is_stateful, partition_range,
partition_slice.default_row_ranges());
// Use a small max_size to force many pages per scan, stressing the
// paging and result-merging logic. With the large row limit here,
// the byte limit is typically the tighter bound.
const auto [results, npages] = read_partitions_with_paged_scan(db, schema, 1000, 1024, is_stateful, partition_range, partition_slice);
const auto expected_partitions = slice_partitions(*schema, mutations, partition_index_range, partition_slice);
@@ -1162,27 +1160,21 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
std::uniform_int_distribution<size_t>(0, 100), // range-tombstones
#else
// Keep these values moderate: with complex randomly-generated
// schemas (deeply nested frozen collections/UDTs), large row
// counts cause data generation and paged scanning to be very
// slow, leading to CI timeouts. The test's value comes from
// schema variety and paging correctness, not from sheer data
// volume.
std::uniform_int_distribution<size_t>(32, 64), // partitions
std::uniform_int_distribution<size_t>(0, 200), // clustering-rows
std::uniform_int_distribution<size_t>(0, 200), // range-tombstones
std::uniform_int_distribution<size_t>(0, 1000), // clustering-rows
std::uniform_int_distribution<size_t>(0, 1000), // range-tombstones
#endif
tests::default_timestamp_generator());
#if defined DEBUG
auto cfg = fuzzy_test_config{seed, 1, 1};
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{8}, 1, 1};
#elif defined DEVEL
auto cfg = fuzzy_test_config{seed, 2, 4};
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 2, 4};
#else
auto cfg = fuzzy_test_config{seed, 4, 8};
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 4, 8};
#endif
testlog.info("Running test workload with configuration: seed={}, concurrency={}, scans={}", cfg.seed,
testlog.info("Running test workload with configuration: seed={}, timeout={}s, concurrency={}, scans={}", cfg.seed, cfg.timeout.count(),
cfg.concurrency, cfg.scans);
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(tbl.schema), &compacted_frozen_mutations = tbl.compacted_frozen_mutations] {

View File

@@ -906,13 +906,9 @@ SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
BOOST_REQUIRE(!entry.permit.get_abort_exception());
// Don't waste time retrying before the timeout is up
sleep(ttl_timeout_test_timeout).get();
eventually_true([&entry] {
return bool(entry.permit.get_abort_exception());
});
sleep(ttl_timeout_test_timeout * 2).get();
BOOST_REQUIRE(entry.permit.get_abort_exception());
BOOST_REQUIRE_THROW(std::rethrow_exception(entry.permit.get_abort_exception()), seastar::named_semaphore_timed_out);
}

View File

@@ -2644,10 +2644,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
return rd;
};
{
memory::scoped_critical_alloc_section dfg;
populate_range(cache, population_range);
}
populate_range(cache, population_range);
auto rd1_v1 = assert_that(make_reader(population_range));
mutation_reader_opt snap;
auto close_snap = defer([&snap] {

View File

@@ -257,44 +257,39 @@ async def manager(request: pytest.FixtureRequest,
yield manager_client
# `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve
# test failure.
cluster_status = None
found_errors = {}
failed = False
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
failed_test_dir_path = None
try:
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
if failed or found_errors:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
str.maketrans('[]', '()'))
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
if failed:
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
urllib.parse.quote(dir_path_relative))
record_property("TEST_LOGS", full_url)
if failed or found_errors:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
str.maketrans('[]', '()'))
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
cluster_status = await manager_client.after_test(test_case_name, not failed)
await manager_client.stop() # Stop client session and close driver after each test
if failed:
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
urllib.parse.quote(dir_path_relative))
record_property("TEST_LOGS", full_url)
cluster_status = await manager_client.after_test(test_case_name, not failed)
finally:
await manager_client.stop() # Stop client session and close driver after each test
if cluster_status is not None and cluster_status["server_broken"] and not failed:
if cluster_status["server_broken"] and not failed:
failed = True
pytest.fail(
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"

View File

@@ -10,7 +10,6 @@ import random
import string
import tempfile
import time
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from pprint import pformat
@@ -274,30 +273,6 @@ class TesterAlternator(BaseAlternator):
logger.info("Testing and validating an update query using key condition expression")
logger.info(f"ConditionExpression update of short circuit is: {conditional_update_short_circuit}")
dc2_table.update_item(**conditional_update_short_circuit)
# Wait for cross-DC replication to reach both live DC1 nodes
# before stopping dc2_node. The LWT commit uses LOCAL_QUORUM,
# which only guarantees DC2 persistence; replication to DC1 is
# async background work. Without this wait, stopping dc2_node
# can drop in-flight RPCs to DC1 while CAS mutations don't
# store hints. We must confirm both live DC1 replicas have the
# data so that the later ConsistentRead=True (LOCAL_QUORUM)
# read on restarted node1 is guaranteed to succeed.
# See https://scylladb.atlassian.net/browse/SCYLLADB-1267
dc1_live_nodes = [
node for node in self.cluster.nodelist()
if node.data_center == node1.data_center and node.server_id != node1.server_id
]
dc1_live_tables = [self.get_table(table_name=TABLE_NAME, node=node) for node in dc1_live_nodes]
wait_for(
lambda: all(
t.get_item(
Key={self._table_primary_key: new_pk_val}, ConsistentRead=False
).get("Item", {}).get("c") == 3
for t in dc1_live_tables
),
timeout=60,
text="Waiting for cross-DC replication of conditional update to both live DC1 nodes",
)
dc2_node.stop()
node1.start()
@@ -506,33 +481,28 @@ class TesterAlternator(BaseAlternator):
2) Issue Alternator 'heavy' requests concurrently (create-table)
3) wait for RequestLimitExceeded error response.
"""
# Keep the limit low to avoid exhausting LSA memory on the 1GB test node
# when multiple CreateTable requests (Raft + schema + flush) run concurrently.
concurrent_requests_limit = 3
concurrent_requests_limit = 5
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
node1 = self.cluster.nodelist()[0]
stop_workers = threading.Event()
create_tables_threads = []
for tables_num in range(concurrent_requests_limit * 5):
create_tables_threads.append(self.run_create_table_thread())
def run_create_table_until_limited() -> None:
while not stop_workers.is_set():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error: # noqa: BLE001
if "RequestLimitExceeded" in str(error):
stop_workers.set()
return
raise
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
def wait_for_create_table_request_failure():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error:
if "RequestLimitExceeded" in error.args[0]:
return
raise
raise ConcurrencyLimitNotExceededError
with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor:
create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)]
wait_for_create_table_request_failure()
if not stop_workers.wait(timeout=30):
raise ConcurrencyLimitNotExceededError
stop_workers.set()
for future in create_table_futures:
future.result(timeout=60)
for thread in create_tables_threads:
thread.join()
@staticmethod
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):

File diff suppressed because it is too large Load Diff

View File

@@ -1182,9 +1182,9 @@ class TestAuth(Tester):
def get_session(self, node_idx=0, user=None, password=None, exclusive=True):
node = self.cluster.nodelist()[node_idx]
if exclusive:
conn = self.patient_exclusive_cql_connection(node, user=user, password=password)
conn = self.patient_exclusive_cql_connection(node, user=user, password=password, timeout=0.1)
else:
conn = self.patient_cql_connection(node, user=user, password=password)
conn = self.patient_cql_connection(node, user=user, password=password, timeout=0.1)
return conn
def assert_permissions_listed(self, expected, session, query, include_superuser=False):

View File

@@ -199,7 +199,7 @@ class GSServer(GSFront):
def unpublish(self):
for k in self.vars:
v = self.oldvars.get(k)
v = self.oldvars[k]
if v:
os.environ[k] = v
elif os.environ.get(k):

View File

@@ -215,11 +215,6 @@ async def test_node_ops_tasks_tree(manager: ManagerClient):
servers, vt_ids = await check_remove_node_tasks_tree(manager, tm, module_name, servers, vt_ids)
servers, vt_ids = await check_decommission_tasks_tree(manager, tm, module_name, servers, vt_ids)
# Reconnect the driver after topology changes (replace, removenode,
# decommission) so that the new_test_keyspace cleanup can reach a
# live node for DROP KEYSPACE.
await manager.driver_connect()
@pytest.mark.asyncio
async def test_node_ops_tasks_ttl(manager: ManagerClient):
"""Test node ops virtual tasks' ttl."""

View File

@@ -1,70 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import asyncio
import time
import pytest
from test.cluster.util import get_current_group0_config
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier
from test.pylib.util import wait_for
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_bootstrap_with_quick_group0_join(manager: ManagerClient):
"""Regression test for https://scylladb.atlassian.net/browse/SCYLLADB-959.
The bug was that when the bootstrapping node joined group0 before reaching
post_server_start, it skipped post_server_start and thus hung forever.
The test simulates the scenario by starting the second node with the
join_group0_pause_before_config_check injection. Without the fix, the
startup times out.
"""
logger.info("Adding first server")
s1 = await manager.server_add()
logger.info("Adding second server with join_group0_pause_before_config_check enabled")
s2 = await manager.server_add(start=False, config={
'error_injections_at_startup': ['join_group0_pause_before_config_check']
})
logger.info(f"Starting {s2}")
start_task = asyncio.create_task(manager.server_start(s2.server_id))
s2_log = await manager.server_open_log(s2.server_id)
await s2_log.wait_for("join_group0_pause_before_config_check: waiting for message", timeout=60)
s1_host_id = await manager.get_host_id(s1.server_id)
s2_host_id = await manager.get_host_id(s2.server_id)
async def s2_in_group0_config_on_s1():
config = await get_current_group0_config(manager, s1)
ids = {m[0] for m in config}
assert s1_host_id in ids # sanity check
return True if s2_host_id in ids else None
# Note: we would like to wait for s2 to see itself in the group0 config, but we can't execute
# get_current_group0_config for s2, as s2 doesn't handle CQL requests at this point. As a workaround, we wait for s1
# to see s2 and then perform a read barrier on s2.
logger.info(f"Waiting for {s1} to see {s2} in the group0 config")
await wait_for(s2_in_group0_config_on_s1, deadline=time.time() + 60, period=0.1)
logger.info(f"Performing read barrier on {s2} to make sure it sees itself in the group0 config")
await read_barrier(manager.api, s2.ip_addr)
logger.info(f"Unblocking {s2}")
await manager.api.message_injection(s2.ip_addr, 'join_group0_pause_before_config_check')
logger.info(f"Waiting for {s2} to complete bootstrap")
await asyncio.wait_for(start_task, timeout=60)

View File

@@ -44,7 +44,6 @@ run_in_dev:
- dtest/bypass_cache_test
- dtest/auth_roles_test
- dtest/audit_test
- audit/test_audit
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test

View File

@@ -177,7 +177,7 @@ async def _smoke_test(manager: ManagerClient, key_provider: KeyProviderFactory,
# restart the cluster
if restart:
await restart(manager, servers, cfs)
cql, _ = await manager.get_ready_cql(servers)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
else:
await manager.rolling_restart(servers)
for table_name in cfs:

View File

@@ -438,7 +438,6 @@ async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: Scylla
await wait_for(all_hosts_are_alive, deadline=time.time() + 60, period=0.1)
logger.info(f"Upgrading {s.server_id}")
await manager.server_change_version(s.server_id, scylla_binary)
await manager.server_sees_others(s.server_id, 2, interval=60.0)
logger.info("Done upgrading servers")

View File

@@ -8,10 +8,7 @@ import asyncio
import time
import pytest
import logging
from functools import partial
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.pylib.internal_types import ServerInfo
logger = logging.getLogger(__name__)
@@ -19,26 +16,6 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_crashed_node_substitution(manager: ManagerClient):
"""Test that a node which crashed after starting gossip but before joining group0
(an 'orphan' node) is eventually removed from gossip by the gossiper_orphan_remover_fiber.
The scenario:
1. Start 3 nodes with the 'fast_orphan_removal_fiber' injection enabled. This freezes
the gossiper_orphan_remover_fiber on each node before it enters its polling loop,
so it cannot remove any orphan until explicitly unblocked.
2. Start a 4th node with the 'crash_before_group0_join' injection enabled. This node
starts gossip normally but blocks inside pre_server_start(), just before sending
the join RPC to the topology coordinator. It never joins group0.
3. Wait until the 4th node's gossip state has fully propagated to all 3 running peers,
then trigger its crash via the injection. At this point all peers see it as an orphan:
present in gossip but absent from the group0 topology.
4. Assert the orphan is visible in gossip (live or down) on the surviving nodes.
5. Unblock the gossiper_orphan_remover_fiber on all 3 nodes (via message_injection) and
enable the 'speedup_orphan_removal' injection so the fiber removes the orphan immediately
without waiting for the normal 60-second age threshold.
6. Wait for the 'Finished to force remove node' log line confirming removal, then assert
the orphan is no longer present in gossip.
"""
servers = await manager.servers_add(3, config={
'error_injections_at_startup': ['fast_orphan_removal_fiber']
})
@@ -53,24 +30,10 @@ async def test_crashed_node_substitution(manager: ManagerClient):
log = await manager.server_open_log(failed_server.server_id)
await log.wait_for("finished do_send_ack2_msg")
failed_id = await manager.get_host_id(failed_server.server_id)
# Wait until the failed server's gossip state has propagated to all running peers.
# "finished do_send_ack2_msg" only guarantees that one peer completed a gossip round
# with the failed server; other nodes learn about it only in subsequent gossip rounds.
# Querying gossip before propagation completes would cause the assertion below to fail
# because the orphan node would not yet appear as live or down on every peer.
async def gossip_has_node(server: ServerInfo):
live = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
down = await manager.api.client.get_json("/gossiper/endpoint/down", host=server.ip_addr)
return True if failed_server.ip_addr in live + down else None
for s in servers:
await wait_for(partial(gossip_has_node, s), deadline=time.time() + 30)
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
await task
live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)
down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr)

View File

@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
from test.pylib.tablets import get_tablet_replicas
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import gather_safely, wait_for
from test.pylib.util import wait_for
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
logger = logging.getLogger(__name__)
@@ -51,42 +51,28 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
@pytest.mark.asyncio
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
node_count = 2
cmdline = ["--logger-log-level", "hints_manager=trace"]
servers = await manager.servers_add(node_count, cmdline=cmdline)
async def wait_for_hints_written(min_hint_count: int, timeout: int):
async def aux():
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
if hints_written >= min_hint_count:
return True
return None
assert await wait_for(aux, time.time() + timeout)
servers = await manager.servers_add(node_count)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
uses_tablets = await keyspace_has_tablets(manager, ks)
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
# Otherwise, it could happen that all mutations would target servers[0] only, which would
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
# distributed more or less uniformly!
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
await manager.server_stop_gracefully(servers[1].server_id)
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
await manager.server_stop_gracefully(servers[1].server_id)
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ANY
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
for i in range(100):
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
# Verify hints are written
await wait_for_hints_written(hints_before + 1, timeout=60)
# Verify hints are written
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
assert hints_after > hints_before
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
@pytest.mark.asyncio
async def test_limited_concurrency_of_writes(manager: ManagerClient):
@@ -100,7 +86,7 @@ async def test_limited_concurrency_of_writes(manager: ManagerClient):
})
node2 = await manager.server_add()
cql = await manager.get_cql_exclusive(node1)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}") as ks:
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")

View File

@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
repaired_keys = set(range(0, nr_keys))
unrepaired_keys = set()
@@ -164,7 +164,7 @@ async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
@pytest.mark.asyncio
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
await insert_keys(cql, ks, 0, 100)
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# Disable autocompaction
for server in servers:
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
nr_keys = 100
cmdline = ["--hinted-handoff-enabled", "0"]
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
await manager.server_stop_gracefully(servers[1].server_id)
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
nr_keys = 100
# Make sure no data commit log replay after force server stop
cmdline = ['--enable-commitlog', '0']
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
@pytest.mark.asyncio
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
token = -1
sstables_repaired_at = 0
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
@pytest.mark.asyncio
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
cmdline = ['--logger-log-level', 'repair=debug']
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)

View File

@@ -20,7 +20,6 @@ from cassandra.query import SimpleStatement
from test.pylib.async_cql import _wrap_future
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, TextType, Column
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name
from test.cluster.conftest import cluster_con
@@ -404,7 +403,6 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
for task in [*valid_keyspaces, *invalid_keyspaces]:
_ = tg.create_task(task)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
"""
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
@@ -466,50 +464,22 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
for rfs, tablets in valid_keyspaces:
_ = tg.create_task(create_keyspace(rfs, tablets))
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
running_servers = await manager.running_servers()
should_start = s1.server_id not in [server.server_id for server in running_servers]
if should_start:
await manager.server_start(s1.server_id)
ks = await create_keyspace(rfs, True)
# We need to wait for the new schema to propagate.
# Otherwise, it's not clear when the mutation
# corresponding to the created keyspace will
# arrive at server 1.
# It could happen only after the node performs
# the check upon start-up, effectively leading
# to a successful start-up, which we don't want.
# For more context, see issue: SCYLLADB-1137.
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
await manager.server_start(s1.server_id, expected_error=err)
_ = await manager.server_start(s1.server_id, expected_error=err)
await cql.run_async(f"DROP KEYSPACE {ks}")
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
# Test RF-rack-invalid keyspaces.
await try_fail([2, 0], "dc1", 2, 3)
await try_fail([3, 2], "dc2", 2, 1)
await try_fail([4, 1], "dc1", 4, 3)
# We need to perform a read barrier on the node to make
# sure that it processes the last DROP KEYSPACE.
# Otherwise, the node could think the RF-rack-invalid
# keyspace still exists.
await manager.server_start(s1.server_id)
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
await manager.server_start(s1.server_id)
_ = await manager.server_start(s1.server_id)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):

View File

@@ -23,25 +23,10 @@ from test.cluster.object_store.conftest import format_tuples
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
from test.cluster.util import new_test_keyspace
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name, wait_for
from test.pylib.util import unique_name
logger = logging.getLogger(__name__)
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
'''
Wait until the upload directory is empty with a timeout.
SSTable unlinking is asynchronous and in rare situations, it can happen
that not all sstables are deleted from the upload dir immediately after refresh is done.
'''
deadline = time.time() + timeout
async def check_empty():
files = os.listdir(upload_dir)
if not files:
return True
return None
await wait_for(check_empty, deadline, period=0.5)
class SSTablesOnLocalStorage:
def __init__(self):
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
@@ -168,8 +153,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
for s in servers:
cf_dir = dirs[s.server_id]["cf_dir"]
upload_dir = os.path.join(cf_dir, 'upload')
assert os.path.exists(upload_dir)
await wait_for_upload_dir_empty(upload_dir)
files = os.listdir(os.path.join(cf_dir, 'upload'))
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
shutil.rmtree(tmpbackup)

View File

@@ -11,6 +11,7 @@ from test.cluster.util import check_token_ring_and_group0_consistency, new_test_
import pytest
import asyncio
import logging
import time
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@@ -52,7 +53,7 @@ async def test_cleanup_stop(manager: ManagerClient):
await s0_log.wait_for('sstable_cleanup_wait: waiting', from_mark=s0_mark)
stop_cleanup = asyncio.create_task(manager.api.stop_compaction(servers[0].ip_addr, "CLEANUP"))
await asyncio.sleep(1)
time.sleep(1)
await manager.api.message_injection(servers[0].ip_addr, "sstable_cleanup_wait")
await stop_cleanup

View File

@@ -2279,7 +2279,7 @@ async def test_split_stopped_on_shutdown(manager: ManagerClient):
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
await log.wait_for('Stopping.*ongoing compactions', from_mark=log_mark)
await log.wait_for('Stopping.*ongoing compactions')
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)

View File

@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
tombstone_mark = datetime.now(timezone.utc)
# test #2: the tombstones are not cleaned up when one node is down
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
await verify_tombstone_gc(tombstone_mark, timeout=5)
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
await verify_tombstone_gc(tombstone_mark, timeout=5)
await manager.remove_node(servers[0].server_id, down_server.server_id)

View File

@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
unpublished_generations = topo_res[0].unpublished_cdc_generations
return unpublished_generations is None or len(unpublished_generations) == 0 or None
await wait_for(all_generations_published, deadline=deadline)
await wait_for(all_generations_published, deadline=deadline, period=1.0)
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
@@ -470,17 +470,6 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
"""
Checks whether the given keyspace uses tablets.
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
"""
cql = manager.get_cql()
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
rows = list(rows_iter)
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]

View File

@@ -7,42 +7,54 @@
*/
#include "limiting_data_source.hh"
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include <cstdint>
using namespace seastar;
future<temporary_buffer<char>> limiting_data_source_impl::do_get() {
uint64_t size = std::min(_limit, _buf.size());
auto res = _buf.share(0, size);
_buf.trim_front(size);
return make_ready_future<temporary_buffer<char>>(std::move(res));
}
class limiting_data_source_impl final : public data_source_impl {
data_source _src;
size_t _limit;
temporary_buffer<char> _buf;
future<temporary_buffer<char>> do_get() {
uint64_t size = std::min(_limit, _buf.size());
auto res = _buf.share(0, size);
_buf.trim_front(size);
return make_ready_future<temporary_buffer<char>>(std::move(res));
}
public:
limiting_data_source_impl(data_source&& src, size_t limit)
: _src(std::move(src))
, _limit(limit)
{}
limiting_data_source_impl::limiting_data_source_impl(data_source&& src, size_t limit) : _src(std::move(src)), _limit(limit) {
}
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
future<temporary_buffer<char>> limiting_data_source_impl::get() {
if (_buf.empty()) {
virtual future<temporary_buffer<char>> get() override {
if (_buf.empty()) {
_buf.release();
return _src.get().then([this] (auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
return do_get();
}
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
if (n < _buf.size()) {
_buf.trim_front(n);
return do_get();
}
n -= _buf.size();
_buf.release();
return _src.get().then([this](auto&& buf) {
return _src.skip(n).then([this] (auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
return do_get();
}
future<temporary_buffer<char>> limiting_data_source_impl::skip(uint64_t n) {
if (n < _buf.size()) {
_buf.trim_front(n);
return do_get();
}
n -= _buf.size();
_buf.release();
return _src.skip(n).then([this](auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
};
data_source make_limiting_data_source(data_source&& src, size_t limit) {
return data_source{std::make_unique<limiting_data_source_impl>(std::move(src), limit)};

View File

@@ -8,25 +8,13 @@
#pragma once
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include <stddef.h>
namespace seastar {
class limiting_data_source_impl : public seastar::data_source_impl {
seastar::data_source _src;
size_t _limit;
seastar::temporary_buffer<char> _buf;
seastar::future<seastar::temporary_buffer<char>> do_get();
class data_source;
public:
limiting_data_source_impl(seastar::data_source&& src, size_t limit);
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
seastar::future<seastar::temporary_buffer<char>> get() override;
seastar::future<seastar::temporary_buffer<char>> skip(uint64_t n) override;
};
}
/// \brief Creates an data_source from another data_source but returns its data in chunks not bigger than a given limit
///

View File

@@ -271,21 +271,10 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
// arbitrary timeout of 120s for the server to make some output. Very generous.
// but since we (maybe) run docker, and might need to pull image, this can take
// some time if we're unlucky.
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
for (auto* f : {&f1, &f2}) {
if (f->failed()) {
try {
f->get();
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
if (!p) {
p = std::current_exception();
}
}
}
}
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
p = std::current_exception();
}

View File

@@ -6,7 +6,6 @@
import os
import random
import socket
import subprocess
import sys
import time
@@ -60,14 +59,7 @@ async def server_address(request, testpy_test: None|Test):
ip = await testpy_test.suite.hosts.lease_host()
else:
ip = f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
# Ask the OS to pick a free port by binding to port 0. This avoids
# collisions with ports still in TIME_WAIT from a previous test module
# that used the same IP. SO_REUSEADDR is set on the probe socket so it
# can reclaim a TIME_WAIT port itself
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, 0))
port = s.getsockname()[1]
port = random.randint(10000, 65535)
yield ServerAddress(ip, port)
if testpy_test is not None:
await testpy_test.suite.hosts.release_host(ip)

View File

@@ -257,7 +257,7 @@ async def run_server(ip, port):
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, ip, port, reuse_address=True, reuse_port=True)
site = aiohttp.web.TCPSite(runner, ip, port)
await site.start()
try:

View File

@@ -4,18 +4,14 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import shutil
import itertools
import asyncio
import pathlib
import re
import os
import subprocess
from typing import Callable
logger = logging.getLogger("DockerizedServer")
class DockerizedServer:
"""class for running an external dockerized service image, typically mock server"""
# pylint: disable=too-many-instance-attributes
@@ -41,7 +37,6 @@ class DockerizedServer:
self.port = None
self.proc = None
self.service_port = port
self.echo_thread = None
async def start(self):
"""Starts docker image on a random port"""
@@ -50,107 +45,77 @@ class DockerizedServer:
if exe is not None)).resolve()
sid = f"{os.getpid()}-{DockerizedServer.newid()}"
name = f'{self.logfilenamebase}-{sid}'
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
self.logfile = logfilename.open("wb")
docker_args = self.docker_args(self.host, self.service_port)
image_args = self.image_args(self.host, self.service_port)
args = [exe, "run", "--name", name, "--rm" ]
if self.service_port is None:
args = args + ["-P"]
else:
args = args + ["-p", str(self.service_port)]
args = args + docker_args + [self.image] + image_args
# This seems weird, using the blocking IO subprocess.
# However, we want to use a pipe reader so we can push the
# output into the test log (because we are bad at propagating
# log files etc from CI)
# But the pipe reader needs to read until EOF, otherwise the
# docker process will eventually hang. So we can't await a
# coroutine.
# We _can_, sort of, use pool.create_task(...) to send a coro
# to the background, and use a signal for waiting, like here,
# thus ensuring the coro runs forever, sort of... However,
# this currently breaks, probably due to some part of the
# machinery/tests that don't async fully, causing us to not
# process the log, and thus hand/fail, bla bla.
# The solution is to make the process synced, and use a
# background thread (execution pool) for the processing.
# This way we know the pipe reader will not suddenly get
# blocked at inconvinient times.
proc = subprocess.Popen(args, stderr=subprocess.PIPE)
loop = asyncio.get_running_loop()
ready_fut = loop.create_future()
def process_io():
f = ready_fut
try:
while True:
data = proc.stderr.readline()
if not data:
if f:
loop.call_soon_threadsafe(f.set_exception, RuntimeError("Log EOF"))
logger.debug("EOF received")
break
line = data.decode()
self.logfile.write(data)
logger.debug(line)
if f and self.is_success_line(line, self.service_port):
logger.info('Got start message: %s', line)
loop.call_soon_threadsafe(f.set_result, True)
f = None
if f and self.is_failure_line(line, self.service_port):
logger.info('Got fail message: %s', line)
loop.call_soon_threadsafe(f.set_result, False)
f = None
except Exception as e:
logger.error("Exception in log processing: %s", e)
if f:
loop.call_soon_threadsafe(f.set_exception, e)
self.echo_thread = loop.run_in_executor(None, process_io)
ok = await ready_fut
if not ok:
self.logfile.close()
proc.kill()
proc.wait()
raise RuntimeError("Could not parse expected launch message from container")
check_proc = await asyncio.create_subprocess_exec(exe
, *["container", "port", name]
, stdout=asyncio.subprocess.PIPE
)
while True:
data = await check_proc.stdout.readline()
if not data:
break
s = data.decode()
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
if m:
self.port = int(m.group(1))
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
self.logfile = logfilename.open("wb")
await check_proc.wait()
if not self.port:
proc.kill()
proc.wait()
raise RuntimeError("Could not query port from container")
self.proc = proc
docker_args = self.docker_args(self.host, self.service_port)
image_args = self.image_args(self.host, self.service_port)
args = ["run", "--name", name, "--rm" ]
if self.service_port is None:
args = args + ["-P"]
else:
args = args + ["-p", str(self.service_port)]
args = args + docker_args + [self.image] + image_args
proc = await asyncio.create_subprocess_exec(exe, *args, stderr=self.logfile)
failed = False
# In any sane world we would just pipe stderr to a pipe and launch a background
# task to just readline from there to both check the start message as well as
# add it to the log (preferrably via logger).
# This works fine when doing this in a standalone python script.
# However, for some reason, when run in a pytest fixture, the pipe will fill up,
# without or reader waking up and doing anyhing, and for any test longer than very
# short, we will fill the stderr buffer and hang.
# I cannot figure out how to get around this, so we workaround it
# instead by directing stderr to a log file, and simply repeatedly
# try to read the info from this file until we are happy.
async with asyncio.timeout(120):
done = False
while not done and not failed:
with logfilename.open("r") as f:
for line in f:
if self.is_success_line(line, self.service_port):
print(f'Got start message: {line}')
done = True
break
if self.is_failure_line(line, self.service_port):
print(f'Got fail message: {line}')
failed = True
break
if failed:
self.logfile.close()
await proc.wait()
continue
check_proc = await asyncio.create_subprocess_exec(exe
, *["container", "port", name]
, stdout=asyncio.subprocess.PIPE
)
while True:
data = await check_proc.stdout.readline()
if not data:
break
s = data.decode()
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
if m:
self.port = int(m.group(1))
await check_proc.wait()
if not self.port:
proc.kill()
raise RuntimeError("Could not query port from container")
self.proc = proc
break
async def stop(self):
"""Stops docker image"""
if self.proc:
logger.debug("Stopping docker process")
self.proc.terminate()
self.proc.wait()
self.proc = None
if self.echo_thread:
logger.debug("Waiting for IO thread")
await self.echo_thread
self.echo_thread = None
await self.proc.wait()
if self.logfile:
logger.debug("Closing log file")
self.logfile.close()
self.logfile = None

View File

@@ -60,7 +60,6 @@ class ManagerClient:
self.con_gen = con_gen
self.ccluster: Optional[CassandraCluster] = None
self.cql: Optional[CassandraSession] = None
self.exclusive_clusters: List[CassandraCluster] = []
# A client for communicating with ScyllaClusterManager (server)
self.sock_path = sock_path
self.client_for_asyncio_loop = {asyncio.get_running_loop(): UnixRESTClient(sock_path)}
@@ -114,9 +113,6 @@ class ManagerClient:
def driver_close(self) -> None:
"""Disconnect from cluster"""
for cluster in self.exclusive_clusters:
cluster.shutdown()
self.exclusive_clusters.clear()
if self.ccluster is not None:
logger.debug("shutting down driver")
safe_driver_shutdown(self.ccluster)
@@ -138,12 +134,9 @@ class ManagerClient:
hosts = await wait_for_cql_and_get_hosts(cql, servers, time() + 60)
return cql, hosts
async def get_cql_exclusive(self, server: ServerInfo, auth_provider: Optional[AuthProvider] = None):
cluster = self.con_gen([server.ip_addr], self.port, self.use_ssl,
auth_provider if auth_provider else self.auth_provider,
WhiteListRoundRobinPolicy([server.ip_addr]))
self.exclusive_clusters.append(cluster)
cql = cluster.connect()
async def get_cql_exclusive(self, server: ServerInfo):
cql = self.con_gen([server.ip_addr], self.port, self.use_ssl, self.auth_provider,
WhiteListRoundRobinPolicy([server.ip_addr])).connect()
await wait_for_cql_and_get_hosts(cql, [server], time() + 60)
return cql

View File

@@ -747,8 +747,6 @@ class ScyllaServer:
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
self.notify_socket.bind(str(self.notify_socket_path))
self._received_serving = False
loop = asyncio.get_running_loop()
def poll_status(s: socket.socket, f: asyncio.Future, logger: Union[logging.Logger, logging.LoggerAdapter]):
# Try to read all available messages from the socket
while True:
@@ -758,7 +756,7 @@ class ScyllaServer:
message = data.decode('utf-8', errors='replace')
if 'STATUS=serving' in message:
logger.debug("Received sd_notify 'serving' message")
loop.call_soon_threadsafe(f.set_result, True)
f.set_result(True)
return
if 'STATUS=entering maintenance mode' in message:
logger.debug("Receive sd_notify 'entering maintenance mode'")
@@ -768,9 +766,9 @@ class ScyllaServer:
except Exception as e:
logger.debug("Error reading from notify socket: %s", e)
break
loop.call_soon_threadsafe(f.set_result, False)
f.set_result(False)
self.serving_signal = loop.create_future()
self.serving_signal = asyncio.get_running_loop().create_future()
t = threading.Thread(target=poll_status, args=[self.notify_socket, self.serving_signal, self.logger], daemon=True)
t.start()
@@ -894,6 +892,7 @@ class ScyllaServer:
return
await report_error("the node startup failed, but the log file doesn't contain the expected error")
await report_error("failed to start the node")
self.logger.info("Wait me %s expect %s is %s", self.server_id, expected_server_up_state, server_up_state)
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
@@ -1395,11 +1394,7 @@ class ScyllaCluster:
f"the test must drop all keyspaces it creates.")
for server in itertools.chain(self.running.values(), self.stopped.values()):
server.write_log_marker(f"------ Ending test {name} ------\n")
# Only close log files when the cluster is dirty (will be destroyed).
# If the cluster is clean and will be reused, keep the log file open
# so that write_log_marker() and take_log_savepoint() work in the
# next test's before_test().
if self.is_dirty and not server.log_file.closed:
if not server.log_file.closed:
server.log_file.close()
async def server_stop(self, server_id: ServerNum, gracefully: bool) -> None:

View File

@@ -56,39 +56,15 @@ def unique_name(unique_name_prefix = 'test_'):
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float,
period: float = 0.1,
period: float = 1,
before_retry: Optional[Callable[[], Any]] = None,
backoff_factor: float = 1.5,
max_period: float = 1.0,
label: Optional[str] = None) -> T:
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
last_exception: Exception | None = None
backoff_factor: float = 1,
max_period: float = None) -> T:
while True:
elapsed = time.time() - start
if time.time() >= deadline:
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
if last_exception is not None:
timeout_msg += (
f"; last exception: {type(last_exception).__name__}: {last_exception}"
)
raise AssertionError(timeout_msg) from last_exception
raise AssertionError(timeout_msg)
try:
res = await pred()
last_exception = None
except Exception as exc:
res = None
last_exception = exc
assert(time.time() < deadline), "Deadline exceeded, failing test."
res = await pred()
if res is not None:
if retries > 0:
logger.debug(f"wait_for({tag}) completed "
f"in {elapsed:.2f}s ({retries} retries)")
return res
retries += 1
await asyncio.sleep(period)
period *= backoff_factor
if max_period is not None:
@@ -297,14 +273,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
async def view_is_built():
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):

View File

@@ -4,7 +4,7 @@
"""
GDB helper functions for `scylla_gdb` tests.
They should be loaded to GDB by "-x {dir}/gdb_utils.py}",
when loaded, they can be run in gdb e.g. `$get_sstables()`
when loaded, they can be run in gdb e.g. `python get_sstables()`
Depends on helper functions injected to GDB by `scylla-gdb.py` script.
(sharded, for_each_table, seastar_lw_shared_ptr, find_sstables, find_vptrs, resolve,
@@ -15,65 +15,39 @@ import gdb
import uuid
class get_schema(gdb.Function):
"""Finds and returns a schema pointer."""
def __init__(self):
super(get_schema, self).__init__('get_schema')
def invoke(self):
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(for_each_table(db))
return seastar_lw_shared_ptr(table['_schema']).get()
def get_schema():
"""Execute GDB commands to get schema information."""
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(for_each_table(db))
ptr = seastar_lw_shared_ptr(table['_schema']).get()
print('schema=', ptr)
class get_sstable(gdb.Function):
"""Finds and returns an sstable pointer."""
def __init__(self):
super(get_sstable, self).__init__('get_sstable')
def invoke(self):
return next(find_sstables())
def get_sstables():
"""Execute GDB commands to get sstables information."""
sst = next(find_sstables())
print(f"sst=(sstables::sstable *)", sst)
class get_task(gdb.Function):
def get_task():
"""
Finds and returns a Scylla fiber task.
Some commands need a task to work on. The following fixture finds one.
Because we stopped Scylla while it was idle, we don't expect to find
any ready task with get_local_tasks(), but we can find one with a
find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
to it for "scylla fiber") is one from http_server::do_accept_one.
"""
def __init__(self):
super(get_task, self).__init__('get_task')
def invoke(self):
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
print(f"task={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
break
class get_coroutine(gdb.Function):
"""
Finds and returns a coroutine frame.
Prints COROUTINE_NOT_FOUND if the coroutine is not present.
"""
def __init__(self):
super(get_coroutine, self).__init__('get_coroutine')
def invoke(self):
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and name.strip() == target:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
print("COROUTINE_NOT_FOUND")
# Register the functions in GDB
get_schema()
get_sstable()
get_task()
get_coroutine()
def get_coroutine():
"""Similar to get_task(), but looks for a coroutine frame."""
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and name.strip() == target:
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")

View File

@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
@@ -22,6 +23,20 @@ pytestmark = [
),
]
@pytest.fixture(scope="module")
def schema(gdb_cmd):
"""
Returns pointer to schema of the first table it finds
Even without any user tables, we will always have system tables.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_schema()").stdout
match = re.search(r"schema=\s*(0x[0-9a-fA-F]+)", result)
schema_pointer = match.group(1) if match else None
return schema_pointer
@pytest.mark.parametrize(
"command",
[
@@ -30,17 +45,21 @@ pytestmark = [
"schema (const schema *)", # `schema` requires type-casted pointer
],
)
def test_schema(gdb_cmd, command):
result = execute_gdb_command(gdb_cmd, f"{command} $get_schema()")
def test_schema(gdb_cmd, command, schema):
assert schema, "Failed to find schema of any table"
result = execute_gdb_command(gdb_cmd, f"{command} {schema}")
assert result.returncode == 0, (
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
)
def test_generate_object_graph(gdb_cmd, request):
def test_generate_object_graph(gdb_cmd, schema, request):
assert schema, "Failed to find schema of any table"
tmpdir = request.config.getoption("--tmpdir")
result = execute_gdb_command(
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 $get_schema()"
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}"
)
assert result.returncode == 0, (
f"GDB command `generate-object-graph` failed. stdout: {result.stdout} stderr: {result.stderr}"

View File

@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
@@ -23,6 +24,16 @@ pytestmark = [
]
@pytest.fixture(scope="module")
def sstable(gdb_cmd):
"""Finds sstable"""
result = execute_gdb_command(gdb_cmd, full_command="python get_sstables()").stdout
match = re.search(r"(\(sstables::sstable \*\) 0x)([0-9a-f]+)", result)
sstable_pointer = match.group(0).strip() if match else None
return sstable_pointer
@pytest.mark.parametrize(
"command",
[
@@ -30,8 +41,10 @@ pytestmark = [
"sstable-index-cache",
],
)
def test_sstable(gdb_cmd, command):
result = execute_gdb_command(gdb_cmd, f"{command} $get_sstable()")
def test_sstable(gdb_cmd, command, sstable):
assert sstable, "No sstable was found"
result = execute_gdb_command(gdb_cmd, f"{command} {sstable}")
assert result.returncode == 0, (
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
)

View File

@@ -6,6 +6,8 @@ Tests for commands, that need a some task to work on.
Each only checks that the command does not fail - but not what it does or returns.
"""
import re
import pytest
from test.scylla_gdb.conftest import execute_gdb_command
@@ -23,25 +25,59 @@ pytestmark = [
]
def test_coroutine_frame(gdb_cmd):
@pytest.fixture(scope="module")
def task(gdb_cmd):
"""
Finds a Scylla fiber task using a `find_vptrs()` loop.
Since Scylla is freshbooted, `get_local_tasks()` returns nothing.
Nevertheless, a `find_vptrs()` scan can still discover the first task
skeleton created by `http_server::do_accept_one` (often the earliest
“Scylla fiber” to appear).
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_task()").stdout
match = re.search(r"task=(\d+)", result)
task = match.group(1) if match else None
return task
@pytest.fixture(scope="module")
def coroutine_task(gdb_cmd, scylla_server):
"""
Finds a coroutine task, similar to the `task` fixture.
This fixture executes the `coroutine_config` script in GDB to locate a
specific coroutine task.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_coroutine()").stdout
match = re.search(r"coroutine_config=\s*(.*)", result)
if not match:
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
return match.group(1).strip()
def test_coroutine_frame(gdb_cmd, coroutine_task):
"""
Offsets the pointer by two words to shift from the outer coroutine frame
to the inner `seastar::task`, as required by `$coro_frame`, which expects
a `seastar::task*`.
"""
assert coroutine_task, "No coroutine task was found"
result = execute_gdb_command(
gdb_cmd, full_command="p *$coro_frame($get_coroutine() + 16)"
gdb_cmd, full_command=f"p *$coro_frame({coroutine_task} + 16)"
)
if "COROUTINE_NOT_FOUND" in result.stdout:
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
assert result.returncode == 0, (
f"GDB command `coro_frame` failed. stdout: {result.stdout} stderr: {result.stderr}"
)
def test_fiber(gdb_cmd):
result = execute_gdb_command(gdb_cmd, "fiber $get_task()")
def test_fiber(gdb_cmd, task):
assert task, f"No task was found using `find_vptrs()`"
result = execute_gdb_command(gdb_cmd, f"fiber {task}")
assert result.returncode == 0, (
f"GDB command `fiber` failed. stdout: {result.stdout} stderr: {result.stderr}"
)

View File

@@ -200,13 +200,13 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
_metrics.add_group("tracing_keyspace_helper", {
sm::make_counter("tracing_errors", [this] { return _stats.tracing_errors; },
sm::description("Counts a number of errors during writing to a system_traces keyspace. "
"One error may cause one or more tracing records to be lost.")).set_skip_when_empty(),
"One error may cause one or more tracing records to be lost.")),
sm::make_counter("bad_column_family_errors", [this] { return _stats.bad_column_family_errors; },
sm::description("Counts a number of times write failed due to one of the tables in the system_traces keyspace has an incompatible schema. "
"One error may result one or more tracing records to be lost. "
"Non-zero value indicates that the administrator has to take immediate steps to fix the corresponding schema. "
"The appropriate error message will be printed in the syslog.")).set_skip_when_empty(),
"The appropriate error message will be printed in the syslog.")),
});
}

View File

@@ -39,17 +39,17 @@ tracing::tracing(sstring tracing_backend_helper_class_name)
_metrics.add_group("tracing", {
sm::make_counter("dropped_sessions", stats.dropped_sessions,
sm::description("Counts a number of dropped sessions due to too many pending sessions/records. "
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
sm::make_counter("dropped_records", stats.dropped_records,
sm::description("Counts a number of dropped records due to too many pending records. "
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
sm::make_counter("trace_records_count", stats.trace_records_count,
sm::description("This metric is a rate of tracing records generation.")),
sm::make_counter("trace_errors", stats.trace_errors,
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")).set_skip_when_empty(),
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")),
sm::make_gauge("active_sessions", _active_sessions,
sm::description("Holds a number of a currently active tracing sessions.")),

View File

@@ -10,7 +10,6 @@
#include <seastar/core/seastar.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/closeable.hh>
#include "init.hh"
#include "supervisor.hh"
#include "directories.hh"