Compare commits

..

1 Commits

Author SHA1 Message Date
Alex
3a901a1bf3 test/cluster/test_view_building_coordinator: start view-building nodes one by one to make the tests stronger
The test_node_operation_during_view_building() setup used servers_add() to
bring up all initial nodes concurrently. That is more aggressive than this test
needs, and it makes the setup sensitive to bootstrap/topology races and to
single-node startup failures. The add_server has notes about this case.
In the decommission case in particular, the test starts with 4 nodes and only
later exercises the node operation under test. When all 4 nodes are started
concurrently, a failure in one node during initial bootstrap can cause the whole
batch add to fail before the test even reaches the decommission step. This
showed up as Failed to add servers, with later nodes timing out while waiting
for topology/IP mapping after one of the early nodes shut down.
Switch the initial cluster setup to repeated server_add() calls. This keeps
the topology changes serialized, allows each node to fully join before the next
one starts, and matches the actual needs of the test. The change does not alter
the scenario being tested; it only makes the test setup less fragile and easier
to diagnose when a node startup problem happens.
2026-03-22 12:08:34 +02:00
41 changed files with 575 additions and 1245 deletions

View File

@@ -8,9 +8,6 @@ on:
jobs:
check-fixes-prefix:
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
steps:
- name: Check PR body for "Fixes" prefix patterns
uses: actions/github-script@v7

View File

@@ -1,6 +1,4 @@
name: Trigger Scylla CI Route
permissions:
contents: read
on:
issue_comment:

View File

@@ -1,8 +1,5 @@
name: Trigger next gating
permissions:
contents: read
on:
push:
branches:

View File

@@ -727,12 +727,7 @@ public:
// now we need one page more to be able to save one for next lap
auto fill_size = align_up(buf1.size(), block_size) + block_size - buf1.size();
// If the underlying stream is already at EOF (e.g. buf1 came from
// cached _next while the previous read_exactly drained the source),
// skip the read_exactly call — it would return empty anyway.
auto buf2 = _input.eof()
? temporary_buffer<char>()
: co_await _input.read_exactly(fill_size);
auto buf2 = co_await _input.read_exactly(fill_size);
temporary_buffer<char> output(buf1.size() + buf2.size());

View File

@@ -42,14 +42,7 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
const auto replication_factor = erm.get_replication_factor();
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
if (read_replicas.size() > replication_factor + 1) {
return seastar::format(
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
read_replicas.size(), replication_factor);
}
} else if (read_replicas.size() > replication_factor) {
if (read_replicas.size() > replication_factor) {
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
}
return {};

View File

@@ -15,7 +15,6 @@ from typing import Any, Optional
import asyncio
import contextlib
import glob
import hashlib
import json
import logging
import os
@@ -365,14 +364,12 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
llvm_profile_file = f"{addr}-%m.profraw"
scylla_workdir = f"{addr}"
logfile = f"{addr}.log"
socket = maintenance_socket_path(cluster_workdir, addr)
command = [
"env",
f"LLVM_PROFILE_FILE={llvm_profile_file}",
f"SCYLLA_HOME={os.path.realpath(os.getcwd())}", # We assume that the script has Scylla's `conf/` as its filesystem neighbour.
os.path.realpath(executable),
f"--workdir={scylla_workdir}",
f"--maintenance-socket={socket}",
"--ring-delay-ms=0",
"--developer-mode=yes",
"--memory=1G",
@@ -394,7 +391,6 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
f"--authenticator=PasswordAuthenticator",
f"--authorizer=CassandraAuthorizer",
] + list(extra_opts)
training_logger.info(f"Using maintenance socket {socket}")
return await run(['bash', '-c', fr"""exec {shlex.join(command)} >{q(logfile)} 2>&1"""], cwd=cluster_workdir)
async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optional[list[str]], workdir: PathLike, cluster_name: str, extra_opts: list[str]) -> list[Process]:
@@ -437,25 +433,16 @@ async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optiona
procs.append(proc)
await wait_for_node(proc, addrs[i], timeout)
except:
await stop_cluster(procs, addrs, cluster_workdir=workdir)
await stop_cluster(procs, addrs)
raise
return procs
async def stop_cluster(procs: list[Process], addrs: list[str], cluster_workdir: PathLike) -> None:
async def stop_cluster(procs: list[Process], addrs: list[str]) -> None:
"""Stops a Scylla cluster started with start_cluster().
Doesn't return until all nodes exit, even if stop_cluster() is cancelled.
"""
await clean_gather(*[cancel_process(p, timeout=60) for p in procs])
_cleanup_short_sockets(cluster_workdir, addrs)
def _cleanup_short_sockets(cluster_workdir: PathLike, addrs: list[str]) -> None:
"""Remove short maintenance socket files created in /tmp."""
for addr in addrs:
try:
os.unlink(maintenance_socket_path(cluster_workdir, addr))
except OSError:
pass
async def wait_for_port(addr: str, port: int) -> None:
await bash(fr'until printf "" >>/dev/tcp/{addr}/{port}; do sleep 0.1; done 2>/dev/null')
@@ -466,17 +453,12 @@ async def merge_profraw(directory: PathLike) -> None:
await bash(fr"llvm-profdata merge {q(directory)}/*.profraw -output {q(directory)}/prof.profdata")
def maintenance_socket_path(cluster_workdir: PathLike, addr: str) -> str:
"""Return the maintenance socket path for a node.
"""Returns the absolute path of the maintenance socket for a given node.
Returns a short deterministic path in /tmp (derived from an MD5 hash of
the natural ``<cluster_workdir>/<addr>/cql.m`` path) to stay within the
Unix domain socket length limit.
The same path is passed to Scylla via ``--maintenance-socket`` in
``start_node()``.
With ``maintenance_socket: workdir`` in scylla.yaml the socket lives at
``<node-workdir>/cql.m``, i.e. ``<cluster_workdir>/<addr>/cql.m``.
"""
natural = os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
path_hash = hashlib.md5(natural.encode()).hexdigest()[:12]
return os.path.join(tempfile.gettempdir(), f'pgo-{path_hash}.m')
return os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
async def setup_cassandra_user(workdir: PathLike, addr: str) -> None:
"""Create the ``cassandra`` superuser via the maintenance socket.
@@ -543,7 +525,7 @@ async def with_cluster(executable: PathLike, workdir: PathLike, cpusets: Optiona
yield addrs, procs
finally:
training_logger.info(f"Stopping the cluster in {workdir}")
await stop_cluster(procs, addrs, cluster_workdir=workdir)
await stop_cluster(procs, addrs)
training_logger.info(f"Stopped the cluster in {workdir}")
################################################################################

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
size 6598648
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
size 6502668

View File

@@ -1622,14 +1622,14 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
size_t next_file_id = 0;
for (auto file_id : found_file_ids) {
if (file_id != next_file_id) {
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id).string()));
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id)));
}
next_file_id++;
}
// populate index from all segments. keep the latest record for each key.
for (auto file_id : found_file_ids) {
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size());
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id), (file_id + 1) * 100 / found_file_ids.size());
co_await max_concurrent_for_each(segments_in_file(file_id), 32,
[this, &db] (log_segment_id seg_id) {
return recover_segment(db, seg_id);

View File

@@ -4860,14 +4860,13 @@ table::query(schema_ptr query_schema,
}
std::optional<full_position> last_pos;
if (querier_opt) {
if (querier_opt->current_position()) {
last_pos.emplace(*querier_opt->current_position());
}
if (!saved_querier || (!querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
co_await querier_opt->close();
querier_opt = {};
}
if (querier_opt && querier_opt->current_position()) {
last_pos.emplace(*querier_opt->current_position());
}
if (!saved_querier || (querier_opt && !querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
co_await querier_opt->close();
querier_opt = {};
}
if (saved_querier) {
*saved_querier = std::move(querier_opt);

View File

@@ -87,11 +87,6 @@ target_include_directories(wasmtime_bindings
target_link_libraries(wasmtime_bindings
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
# this PCH must have matching compile options, otherwise the compiler rejects
# the PCH due to flag mismatch (e.g., -fsanitize=address).
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
endif()
@@ -113,6 +108,5 @@ target_include_directories(inc
target_link_libraries(inc
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_link_libraries(inc PRIVATE Seastar::seastar)
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
endif()

View File

@@ -952,8 +952,6 @@ class sstring:
@staticmethod
def to_hex(data, size):
if size == 0:
return ''
inf = gdb.selected_inferior()
return bytes(inf.read_memory(data, size)).hex()
@@ -976,8 +974,6 @@ class sstring:
return self.ref['u']['external']['str']
def as_bytes(self):
if len(self) == 0:
return b''
inf = gdb.selected_inferior()
return bytes(inf.read_memory(self.data(), len(self)))
@@ -5640,8 +5636,6 @@ class scylla_sstable_summary(gdb.Command):
self.inf = gdb.selected_inferior()
def to_hex(self, data, size):
if size == 0:
return ''
return bytes(self.inf.read_memory(data, size)).hex()
def invoke(self, arg, for_tty):
@@ -5653,10 +5647,6 @@ class scylla_sstable_summary(gdb.Command):
sst = seastar_lw_shared_ptr(arg).get().dereference()
else:
sst = arg
ms_version = int(gdb.parse_and_eval('sstables::sstable_version_types::ms'))
if int(sst['_version']) >= ms_version:
gdb.write("sstable uses ms format (trie-based index); summary is not populated.\n")
return
summary = seastar_lw_shared_ptr(sst['_components']['_value']).get().dereference()['summary']
gdb.write("header: {}\n".format(summary['header']))

View File

@@ -910,7 +910,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
} else {
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
frozen_muts_to_apply.push_back(co_await freeze_gently(m));
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
});
}
}

View File

@@ -221,16 +221,10 @@ private:
sst->set_sstable_level(0);
auto units = co_await sst_manager.dir_semaphore().get_units(1);
sstables::sstable_open_config cfg {
.unsealed_sstable = true,
.ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch(),
};
co_await sst->load(table.get_effective_replication_map()->get_sharder(*table.schema()), cfg);
co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(writer_cfg.backup);
}
});
co_await table.add_sstable_and_update_cache(sst);
}
future<>
@@ -301,8 +295,7 @@ private:
sstables::sstable_state::normal,
sstables::sstable::component_basename(
_table.schema()->ks_name(), _table.schema()->cf_name(), descriptor.version, gen, descriptor.format, it->first),
sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend(),
.leave_unsealed = true});
sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend()});
auto out = co_await sstable_sink->output(foptions, stream_options);
input_stream src(co_await [this, &it, sstable, f = files.at(it->first)]() -> future<input_stream<char>> {

View File

@@ -181,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
help="Run only tests for given build mode(s)")
parser.add_argument('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution")
parser.add_argument('--timeout', action="store", default="3600", type=int,
parser.add_argument('--timeout', action="store", default="24000", type=int,
help="timeout value for single test execution")
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
help="timeout value for test.py/pytest session execution")

View File

@@ -62,11 +62,7 @@ SEASTAR_TEST_CASE(test_index_doesnt_flood_cache_in_small_partition_workload) {
// cfg.db_config->index_cache_fraction.set(1.0);
return do_with_cql_env_thread([] (cql_test_env& e) {
// We disable compactions because they cause confusing cache mispopulations.
// We disable compression because the sstable writer targets a specific
// (*compressed* data file size : summary file size) ratio,
// so the number of keys per index page becomes hard to control,
// and might be arbitrarily large.
e.execute_cql("CREATE TABLE ks.t(pk blob PRIMARY KEY) WITH compaction = { 'class' : 'NullCompactionStrategy' } AND compression = {'sstable_compression': ''};").get();
e.execute_cql("CREATE TABLE ks.t(pk blob PRIMARY KEY) WITH compaction = { 'class' : 'NullCompactionStrategy' };").get();
auto insert_query = e.prepare("INSERT INTO ks.t(pk) VALUES (?)").get();
auto select_query = e.prepare("SELECT * FROM t WHERE pk = ?").get();
@@ -158,11 +154,7 @@ SEASTAR_TEST_CASE(test_index_is_cached_in_big_partition_workload) {
// cfg.db_config->index_cache_fraction.set(0.0);
return do_with_cql_env_thread([] (cql_test_env& e) {
// We disable compactions because they cause confusing cache mispopulations.
// We disable compression because the sstable writer targets a specific
// (*compressed* data file size : summary file size) ratio,
// so the number of keys per index page becomes hard to control,
// and might be arbitrarily large.
e.execute_cql("CREATE TABLE ks.t(pk bigint, ck bigint, v blob, primary key (pk, ck)) WITH compaction = { 'class' : 'NullCompactionStrategy' } AND compression = {'sstable_compression': ''};").get();
e.execute_cql("CREATE TABLE ks.t(pk bigint, ck bigint, v blob, primary key (pk, ck)) WITH compaction = { 'class' : 'NullCompactionStrategy' };").get();
auto insert_query = e.prepare("INSERT INTO ks.t(pk, ck, v) VALUES (?, ?, ?)").get();
auto select_query = e.prepare("SELECT * FROM t WHERE pk = ? AND ck = ?").get();

View File

@@ -23,11 +23,8 @@
#include "test/lib/tmpdir.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/exception_utils.hh"
#include "test/lib/limiting_data_source.hh"
#include "utils/io-wrappers.hh"
#include <seastar/util/memory-data-source.hh>
using namespace encryption;
static tmpdir dir;
@@ -598,113 +595,6 @@ SEASTAR_TEST_CASE(test_encrypted_data_source_simple) {
co_await test_random_data_source(sizes);
}
// Reproduces the production deadlock where encrypted SSTable component downloads
// got stuck during restore. The encrypted_data_source::get() caches a block in
// _next, then on the next call bypasses input_stream::read()'s _eof check and
// calls input_stream::read_exactly() — which does NOT check _eof when _buf is
// empty. This causes a second get() on the underlying source after EOS.
//
// In production the underlying source was chunked_download_source whose get()
// hung forever. Here we simulate it with a strict source that fails the test.
//
// The fix belongs in seastar's input_stream::read_exactly(): check _eof before
// calling _fd.get(), consistent with read(), read_up_to(), and consume().
static future<> test_encrypted_source_copy(size_t plaintext_size) {
testlog.info("test_encrypted_source_copy: plaintext_size={}", plaintext_size);
key_info info{"AES/CBC", 256};
auto k = ::make_shared<symmetric_key>(info);
// Step 1: Encrypt the plaintext into memory buffers
auto plaintext = generate_random<char>(plaintext_size);
std::vector<temporary_buffer<char>> encrypted_bufs;
{
data_sink sink(make_encrypted_sink(create_memory_sink(encrypted_bufs), k));
co_await sink.put(plaintext.clone());
co_await sink.close();
}
// Flatten encrypted buffers into a single contiguous buffer
size_t encrypted_total = 0;
for (const auto& b : encrypted_bufs) {
encrypted_total += b.size();
}
temporary_buffer<char> encrypted(encrypted_total);
size_t pos = 0;
for (const auto& b : encrypted_bufs) {
std::copy(b.begin(), b.end(), encrypted.get_write() + pos);
pos += b.size();
}
// Step 2: Create a data source from the encrypted data that fails on
// post-EOS get() — simulating a source like chunked_download_source
// that would hang forever in this situation.
class strict_memory_source final : public limiting_data_source_impl {
bool _eof = false;
public:
strict_memory_source(temporary_buffer<char> data, size_t chunk_size)
: limiting_data_source_impl(
data_source(std::make_unique<util::temporary_buffer_data_source>(std::move(data))),
chunk_size) {}
future<temporary_buffer<char>> get() override {
BOOST_REQUIRE_MESSAGE(!_eof,
"get() called on source after it already returned EOS — "
"this is the production deadlock: read_exactly() does not "
"check _eof before calling _fd.get()");
auto buf = co_await limiting_data_source_impl::get();
_eof = buf.empty();
co_return buf;
}
};
// Step 3: Wrap in encrypted_data_source and drain via consume() —
// the exact code path used by seastar::copy() which is what
// sstables_loader_helpers::download_sstable() calls.
// Try multiple chunk sizes to hit different alignment scenarios.
for (size_t chunk_size : {1ul, 7ul, 4096ul, 8192ul, encrypted_total, encrypted_total + 1}) {
if (chunk_size == 0) continue;
auto src = data_source(make_encrypted_source(
data_source(std::make_unique<strict_memory_source>(encrypted.clone(), chunk_size)), k));
auto in = input_stream<char>(std::move(src));
// consume() is what seastar::copy() uses internally. It calls
// encrypted_data_source::get() via _fd.get() until EOF.
size_t total_decrypted = 0;
co_await in.consume([&total_decrypted](temporary_buffer<char> buf) {
total_decrypted += buf.size();
return make_ready_future<consumption_result<char>>(continue_consuming{});
});
co_await in.close();
BOOST_REQUIRE_EQUAL(total_decrypted, plaintext_size);
}
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_8k) {
co_await test_encrypted_source_copy(8192);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_4k) {
co_await test_encrypted_source_copy(4096);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_small) {
co_await test_encrypted_source_copy(100);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_12k) {
co_await test_encrypted_source_copy(12288);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_unaligned) {
co_await test_encrypted_source_copy(8193);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_1byte) {
co_await test_encrypted_source_copy(1);
}
SEASTAR_TEST_CASE(test_encrypted_data_source_fuzzy) {
std::mt19937_64 rand_gen(std::random_device{}());

View File

@@ -691,7 +691,7 @@ class TesterAlternator(BaseAlternator):
random.choice(nodes_for_maintenance).compact()
except NodetoolError as exc:
error_message = str(exc)
valid_errors = ["ConnectException", "Connection refused", "status code 404 Not Found"]
valid_errors = ["ConnectException", "status code 404 Not Found"]
if not any(err in error_message for err in valid_errors):
raise

File diff suppressed because it is too large Load Diff

View File

@@ -353,7 +353,7 @@ class TestSchemaManagement(Tester):
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_exclusive_cql_connection(node2)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
@@ -382,7 +382,7 @@ class TestSchemaManagement(Tester):
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_exclusive_cql_connection(node2)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))

View File

@@ -808,16 +808,7 @@ async def test_index_requires_rf_rack_valid_keyspace(manager: ManagerClient):
# Create a table with tablets and no indexes, then add a GSI - the update should fail
table_name = unique_table_name()
# The server waits 10s for schema agreement after creating a table,
# which may not be enough after a sequence of rapid schema changes
# on a multi-node cluster (see SCYLLADB-1135). Retry if needed.
for attempt in range(2):
try:
create_table_with_index(alternator, table_name, index_type=None, initial_tablets='1')
break
except ClientError as e:
if 'schema agreement' not in str(e) or attempt == 1:
raise
create_table_with_index(alternator, table_name, index_type=None, initial_tablets='1')
with pytest.raises(ClientError, match=expected_err_update_add_gsi):
alternator.meta.client.update_table(
TableName=table_name,

View File

@@ -44,7 +44,6 @@ run_in_dev:
- dtest/bypass_cache_test
- dtest/auth_roles_test
- dtest/audit_test
- audit/test_audit
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test

View File

@@ -438,7 +438,6 @@ async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: Scylla
await wait_for(all_hosts_are_alive, deadline=time.time() + 60, period=0.1)
logger.info(f"Upgrading {s.server_id}")
await manager.server_change_version(s.server_id, scylla_binary)
await manager.server_sees_others(s.server_id, 2, interval=60.0)
logger.info("Done upgrading servers")

View File

@@ -8,10 +8,7 @@ import asyncio
import time
import pytest
import logging
from functools import partial
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.pylib.internal_types import ServerInfo
logger = logging.getLogger(__name__)
@@ -19,26 +16,6 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_crashed_node_substitution(manager: ManagerClient):
"""Test that a node which crashed after starting gossip but before joining group0
(an 'orphan' node) is eventually removed from gossip by the gossiper_orphan_remover_fiber.
The scenario:
1. Start 3 nodes with the 'fast_orphan_removal_fiber' injection enabled. This freezes
the gossiper_orphan_remover_fiber on each node before it enters its polling loop,
so it cannot remove any orphan until explicitly unblocked.
2. Start a 4th node with the 'crash_before_group0_join' injection enabled. This node
starts gossip normally but blocks inside pre_server_start(), just before sending
the join RPC to the topology coordinator. It never joins group0.
3. Wait until the 4th node's gossip state has fully propagated to all 3 running peers,
then trigger its crash via the injection. At this point all peers see it as an orphan:
present in gossip but absent from the group0 topology.
4. Assert the orphan is visible in gossip (live or down) on the surviving nodes.
5. Unblock the gossiper_orphan_remover_fiber on all 3 nodes (via message_injection) and
enable the 'speedup_orphan_removal' injection so the fiber removes the orphan immediately
without waiting for the normal 60-second age threshold.
6. Wait for the 'Finished to force remove node' log line confirming removal, then assert
the orphan is no longer present in gossip.
"""
servers = await manager.servers_add(3, config={
'error_injections_at_startup': ['fast_orphan_removal_fiber']
})
@@ -53,24 +30,10 @@ async def test_crashed_node_substitution(manager: ManagerClient):
log = await manager.server_open_log(failed_server.server_id)
await log.wait_for("finished do_send_ack2_msg")
failed_id = await manager.get_host_id(failed_server.server_id)
# Wait until the failed server's gossip state has propagated to all running peers.
# "finished do_send_ack2_msg" only guarantees that one peer completed a gossip round
# with the failed server; other nodes learn about it only in subsequent gossip rounds.
# Querying gossip before propagation completes would cause the assertion below to fail
# because the orphan node would not yet appear as live or down on every peer.
async def gossip_has_node(server: ServerInfo):
live = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
down = await manager.api.client.get_json("/gossiper/endpoint/down", host=server.ip_addr)
return True if failed_server.ip_addr in live + down else None
for s in servers:
await wait_for(partial(gossip_has_node, s), deadline=time.time() + 30)
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
await task
live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)
down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr)

View File

@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
from test.pylib.tablets import get_tablet_replicas
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import gather_safely, wait_for
from test.pylib.util import wait_for
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
logger = logging.getLogger(__name__)
@@ -51,42 +51,28 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
@pytest.mark.asyncio
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
node_count = 2
cmdline = ["--logger-log-level", "hints_manager=trace"]
servers = await manager.servers_add(node_count, cmdline=cmdline)
async def wait_for_hints_written(min_hint_count: int, timeout: int):
async def aux():
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
if hints_written >= min_hint_count:
return True
return None
assert await wait_for(aux, time.time() + timeout)
servers = await manager.servers_add(node_count)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
uses_tablets = await keyspace_has_tablets(manager, ks)
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
# Otherwise, it could happen that all mutations would target servers[0] only, which would
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
# distributed more or less uniformly!
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
await manager.server_stop_gracefully(servers[1].server_id)
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
await manager.server_stop_gracefully(servers[1].server_id)
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ANY
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
for i in range(100):
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
# Verify hints are written
await wait_for_hints_written(hints_before + 1, timeout=60)
# Verify hints are written
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
assert hints_after > hints_before
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
@pytest.mark.asyncio
async def test_limited_concurrency_of_writes(manager: ManagerClient):

View File

@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
repaired_keys = set(range(0, nr_keys))
unrepaired_keys = set()
@@ -164,7 +164,7 @@ async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
@pytest.mark.asyncio
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
await insert_keys(cql, ks, 0, 100)
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# Disable autocompaction
for server in servers:
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
nr_keys = 100
cmdline = ["--hinted-handoff-enabled", "0"]
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
await manager.server_stop_gracefully(servers[1].server_id)
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
nr_keys = 100
# Make sure no data commit log replay after force server stop
cmdline = ['--enable-commitlog', '0']
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
@pytest.mark.asyncio
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
token = -1
sstables_repaired_at = 0
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
@pytest.mark.asyncio
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
cmdline = ['--logger-log-level', 'repair=debug']
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)

View File

@@ -20,7 +20,6 @@ from cassandra.query import SimpleStatement
from test.pylib.async_cql import _wrap_future
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, TextType, Column
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name
from test.cluster.conftest import cluster_con
@@ -404,7 +403,6 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
for task in [*valid_keyspaces, *invalid_keyspaces]:
_ = tg.create_task(task)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
"""
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
@@ -466,50 +464,22 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
for rfs, tablets in valid_keyspaces:
_ = tg.create_task(create_keyspace(rfs, tablets))
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
running_servers = await manager.running_servers()
should_start = s1.server_id not in [server.server_id for server in running_servers]
if should_start:
await manager.server_start(s1.server_id)
ks = await create_keyspace(rfs, True)
# We need to wait for the new schema to propagate.
# Otherwise, it's not clear when the mutation
# corresponding to the created keyspace will
# arrive at server 1.
# It could happen only after the node performs
# the check upon start-up, effectively leading
# to a successful start-up, which we don't want.
# For more context, see issue: SCYLLADB-1137.
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
await manager.server_start(s1.server_id, expected_error=err)
_ = await manager.server_start(s1.server_id, expected_error=err)
await cql.run_async(f"DROP KEYSPACE {ks}")
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
# Test RF-rack-invalid keyspaces.
await try_fail([2, 0], "dc1", 2, 3)
await try_fail([3, 2], "dc2", 2, 1)
await try_fail([4, 1], "dc1", 4, 3)
# We need to perform a read barrier on the node to make
# sure that it processes the last DROP KEYSPACE.
# Otherwise, the node could think the RF-rack-invalid
# keyspace still exists.
await manager.server_start(s1.server_id)
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
await manager.server_start(s1.server_id)
_ = await manager.server_start(s1.server_id)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):

View File

@@ -23,25 +23,10 @@ from test.cluster.object_store.conftest import format_tuples
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
from test.cluster.util import new_test_keyspace
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name, wait_for
from test.pylib.util import unique_name
logger = logging.getLogger(__name__)
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
'''
Wait until the upload directory is empty with a timeout.
SSTable unlinking is asynchronous and in rare situations, it can happen
that not all sstables are deleted from the upload dir immediately after refresh is done.
'''
deadline = time.time() + timeout
async def check_empty():
files = os.listdir(upload_dir)
if not files:
return True
return None
await wait_for(check_empty, deadline, period=0.5)
class SSTablesOnLocalStorage:
def __init__(self):
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
@@ -168,8 +153,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
for s in servers:
cf_dir = dirs[s.server_id]["cf_dir"]
upload_dir = os.path.join(cf_dir, 'upload')
assert os.path.exists(upload_dir)
await wait_for_upload_dir_empty(upload_dir)
files = os.listdir(os.path.join(cf_dir, 'upload'))
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
shutil.rmtree(tmpbackup)

View File

@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
tombstone_mark = datetime.now(timezone.utc)
# test #2: the tombstones are not cleaned up when one node is down
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
await verify_tombstone_gc(tombstone_mark, timeout=5)
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
await verify_tombstone_gc(tombstone_mark, timeout=5)
await manager.remove_node(servers[0].server_id, down_server.server_id)

View File

@@ -352,9 +352,14 @@ async def test_node_operation_during_view_building(manager: ManagerClient, opera
rack_layout = ["rack1", "rack2", "rack3"]
property_file = [{"dc": "dc1", "rack": rack} for rack in rack_layout]
servers = await manager.servers_add(node_count, config={"enable_tablets": "true"},
cmdline=cmdline_loggers,
property_file=property_file)
servers = [
await manager.server_add(
config={"enable_tablets": "true"},
cmdline=cmdline_loggers,
property_file=server_property_file,
)
for server_property_file in property_file
]
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()

View File

@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
unpublished_generations = topo_res[0].unpublished_cdc_generations
return unpublished_generations is None or len(unpublished_generations) == 0 or None
await wait_for(all_generations_published, deadline=deadline)
await wait_for(all_generations_published, deadline=deadline, period=1.0)
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
@@ -470,17 +470,6 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
"""
Checks whether the given keyspace uses tablets.
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
"""
cql = manager.get_cql()
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
rows = list(rows_iter)
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]

View File

@@ -7,42 +7,54 @@
*/
#include "limiting_data_source.hh"
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include <cstdint>
using namespace seastar;
future<temporary_buffer<char>> limiting_data_source_impl::do_get() {
uint64_t size = std::min(_limit, _buf.size());
auto res = _buf.share(0, size);
_buf.trim_front(size);
return make_ready_future<temporary_buffer<char>>(std::move(res));
}
class limiting_data_source_impl final : public data_source_impl {
data_source _src;
size_t _limit;
temporary_buffer<char> _buf;
future<temporary_buffer<char>> do_get() {
uint64_t size = std::min(_limit, _buf.size());
auto res = _buf.share(0, size);
_buf.trim_front(size);
return make_ready_future<temporary_buffer<char>>(std::move(res));
}
public:
limiting_data_source_impl(data_source&& src, size_t limit)
: _src(std::move(src))
, _limit(limit)
{}
limiting_data_source_impl::limiting_data_source_impl(data_source&& src, size_t limit) : _src(std::move(src)), _limit(limit) {
}
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
future<temporary_buffer<char>> limiting_data_source_impl::get() {
if (_buf.empty()) {
virtual future<temporary_buffer<char>> get() override {
if (_buf.empty()) {
_buf.release();
return _src.get().then([this] (auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
return do_get();
}
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
if (n < _buf.size()) {
_buf.trim_front(n);
return do_get();
}
n -= _buf.size();
_buf.release();
return _src.get().then([this](auto&& buf) {
return _src.skip(n).then([this] (auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
return do_get();
}
future<temporary_buffer<char>> limiting_data_source_impl::skip(uint64_t n) {
if (n < _buf.size()) {
_buf.trim_front(n);
return do_get();
}
n -= _buf.size();
_buf.release();
return _src.skip(n).then([this](auto&& buf) {
_buf = std::move(buf);
return do_get();
});
}
};
data_source make_limiting_data_source(data_source&& src, size_t limit) {
return data_source{std::make_unique<limiting_data_source_impl>(std::move(src), limit)};

View File

@@ -8,25 +8,13 @@
#pragma once
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include <stddef.h>
namespace seastar {
class limiting_data_source_impl : public seastar::data_source_impl {
seastar::data_source _src;
size_t _limit;
seastar::temporary_buffer<char> _buf;
seastar::future<seastar::temporary_buffer<char>> do_get();
class data_source;
public:
limiting_data_source_impl(seastar::data_source&& src, size_t limit);
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
seastar::future<seastar::temporary_buffer<char>> get() override;
seastar::future<seastar::temporary_buffer<char>> skip(uint64_t n) override;
};
}
/// \brief Creates an data_source from another data_source but returns its data in chunks not bigger than a given limit
///

View File

@@ -271,21 +271,10 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
// arbitrary timeout of 120s for the server to make some output. Very generous.
// but since we (maybe) run docker, and might need to pull image, this can take
// some time if we're unlucky.
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
for (auto* f : {&f1, &f2}) {
if (f->failed()) {
try {
f->get();
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
if (!p) {
p = std::current_exception();
}
}
}
}
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
p = std::current_exception();
}

View File

@@ -60,7 +60,6 @@ class ManagerClient:
self.con_gen = con_gen
self.ccluster: Optional[CassandraCluster] = None
self.cql: Optional[CassandraSession] = None
self.exclusive_clusters: List[CassandraCluster] = []
# A client for communicating with ScyllaClusterManager (server)
self.sock_path = sock_path
self.client_for_asyncio_loop = {asyncio.get_running_loop(): UnixRESTClient(sock_path)}
@@ -114,9 +113,6 @@ class ManagerClient:
def driver_close(self) -> None:
"""Disconnect from cluster"""
for cluster in self.exclusive_clusters:
cluster.shutdown()
self.exclusive_clusters.clear()
if self.ccluster is not None:
logger.debug("shutting down driver")
safe_driver_shutdown(self.ccluster)
@@ -138,12 +134,9 @@ class ManagerClient:
hosts = await wait_for_cql_and_get_hosts(cql, servers, time() + 60)
return cql, hosts
async def get_cql_exclusive(self, server: ServerInfo, auth_provider: Optional[AuthProvider] = None):
cluster = self.con_gen([server.ip_addr], self.port, self.use_ssl,
auth_provider if auth_provider else self.auth_provider,
WhiteListRoundRobinPolicy([server.ip_addr]))
self.exclusive_clusters.append(cluster)
cql = cluster.connect()
async def get_cql_exclusive(self, server: ServerInfo):
cql = self.con_gen([server.ip_addr], self.port, self.use_ssl, self.auth_provider,
WhiteListRoundRobinPolicy([server.ip_addr])).connect()
await wait_for_cql_and_get_hosts(cql, [server], time() + 60)
return cql

View File

@@ -1394,11 +1394,7 @@ class ScyllaCluster:
f"the test must drop all keyspaces it creates.")
for server in itertools.chain(self.running.values(), self.stopped.values()):
server.write_log_marker(f"------ Ending test {name} ------\n")
# Only close log files when the cluster is dirty (will be destroyed).
# If the cluster is clean and will be reused, keep the log file open
# so that write_log_marker() and take_log_savepoint() work in the
# next test's before_test().
if self.is_dirty and not server.log_file.closed:
if not server.log_file.closed:
server.log_file.close()
async def server_stop(self, server_id: ServerNum, gracefully: bool) -> None:

View File

@@ -56,25 +56,15 @@ def unique_name(unique_name_prefix = 'test_'):
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float,
period: float = 0.1,
period: float = 1,
before_retry: Optional[Callable[[], Any]] = None,
backoff_factor: float = 1.5,
max_period: float = 1.0,
label: Optional[str] = None) -> T:
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
backoff_factor: float = 1,
max_period: float = None) -> T:
while True:
elapsed = time.time() - start
assert time.time() < deadline, \
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
assert(time.time() < deadline), "Deadline exceeded, failing test."
res = await pred()
if res is not None:
if retries > 0:
logger.debug(f"wait_for({tag}) completed "
f"in {elapsed:.2f}s ({retries} retries)")
return res
retries += 1
await asyncio.sleep(period)
period *= backoff_factor
if max_period is not None:
@@ -283,14 +273,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
async def view_is_built():
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):

View File

@@ -4,7 +4,7 @@
"""
GDB helper functions for `scylla_gdb` tests.
They should be loaded to GDB by "-x {dir}/gdb_utils.py}",
when loaded, they can be run in gdb e.g. `$get_sstables()`
when loaded, they can be run in gdb e.g. `python get_sstables()`
Depends on helper functions injected to GDB by `scylla-gdb.py` script.
(sharded, for_each_table, seastar_lw_shared_ptr, find_sstables, find_vptrs, resolve,
@@ -15,65 +15,39 @@ import gdb
import uuid
class get_schema(gdb.Function):
"""Finds and returns a schema pointer."""
def __init__(self):
super(get_schema, self).__init__('get_schema')
def invoke(self):
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(for_each_table(db))
return seastar_lw_shared_ptr(table['_schema']).get()
def get_schema():
"""Execute GDB commands to get schema information."""
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(for_each_table(db))
ptr = seastar_lw_shared_ptr(table['_schema']).get()
print('schema=', ptr)
class get_sstable(gdb.Function):
"""Finds and returns an sstable pointer."""
def __init__(self):
super(get_sstable, self).__init__('get_sstable')
def invoke(self):
return next(find_sstables())
def get_sstables():
"""Execute GDB commands to get sstables information."""
sst = next(find_sstables())
print(f"sst=(sstables::sstable *)", sst)
class get_task(gdb.Function):
def get_task():
"""
Finds and returns a Scylla fiber task.
Some commands need a task to work on. The following fixture finds one.
Because we stopped Scylla while it was idle, we don't expect to find
any ready task with get_local_tasks(), but we can find one with a
find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
to it for "scylla fiber") is one from http_server::do_accept_one.
"""
def __init__(self):
super(get_task, self).__init__('get_task')
def invoke(self):
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
print(f"task={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
break
class get_coroutine(gdb.Function):
"""
Finds and returns a coroutine frame.
Prints COROUTINE_NOT_FOUND if the coroutine is not present.
"""
def __init__(self):
super(get_coroutine, self).__init__('get_coroutine')
def invoke(self):
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and name.strip() == target:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
print("COROUTINE_NOT_FOUND")
# Register the functions in GDB
get_schema()
get_sstable()
get_task()
get_coroutine()
def get_coroutine():
"""Similar to get_task(), but looks for a coroutine frame."""
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and name.strip() == target:
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")

View File

@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
@@ -22,6 +23,20 @@ pytestmark = [
),
]
@pytest.fixture(scope="module")
def schema(gdb_cmd):
"""
Returns pointer to schema of the first table it finds
Even without any user tables, we will always have system tables.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_schema()").stdout
match = re.search(r"schema=\s*(0x[0-9a-fA-F]+)", result)
schema_pointer = match.group(1) if match else None
return schema_pointer
@pytest.mark.parametrize(
"command",
[
@@ -30,17 +45,21 @@ pytestmark = [
"schema (const schema *)", # `schema` requires type-casted pointer
],
)
def test_schema(gdb_cmd, command):
result = execute_gdb_command(gdb_cmd, f"{command} $get_schema()")
def test_schema(gdb_cmd, command, schema):
assert schema, "Failed to find schema of any table"
result = execute_gdb_command(gdb_cmd, f"{command} {schema}")
assert result.returncode == 0, (
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
)
def test_generate_object_graph(gdb_cmd, request):
def test_generate_object_graph(gdb_cmd, schema, request):
assert schema, "Failed to find schema of any table"
tmpdir = request.config.getoption("--tmpdir")
result = execute_gdb_command(
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 $get_schema()"
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}"
)
assert result.returncode == 0, (
f"GDB command `generate-object-graph` failed. stdout: {result.stdout} stderr: {result.stderr}"

View File

@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
@@ -23,6 +24,16 @@ pytestmark = [
]
@pytest.fixture(scope="module")
def sstable(gdb_cmd):
"""Finds sstable"""
result = execute_gdb_command(gdb_cmd, full_command="python get_sstables()").stdout
match = re.search(r"(\(sstables::sstable \*\) 0x)([0-9a-f]+)", result)
sstable_pointer = match.group(0).strip() if match else None
return sstable_pointer
@pytest.mark.parametrize(
"command",
[
@@ -30,8 +41,10 @@ pytestmark = [
"sstable-index-cache",
],
)
def test_sstable(gdb_cmd, command):
result = execute_gdb_command(gdb_cmd, f"{command} $get_sstable()")
def test_sstable(gdb_cmd, command, sstable):
assert sstable, "No sstable was found"
result = execute_gdb_command(gdb_cmd, f"{command} {sstable}")
assert result.returncode == 0, (
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
)

View File

@@ -6,6 +6,8 @@ Tests for commands, that need a some task to work on.
Each only checks that the command does not fail - but not what it does or returns.
"""
import re
import pytest
from test.scylla_gdb.conftest import execute_gdb_command
@@ -23,25 +25,59 @@ pytestmark = [
]
def test_coroutine_frame(gdb_cmd):
@pytest.fixture(scope="module")
def task(gdb_cmd):
"""
Finds a Scylla fiber task using a `find_vptrs()` loop.
Since Scylla is freshbooted, `get_local_tasks()` returns nothing.
Nevertheless, a `find_vptrs()` scan can still discover the first task
skeleton created by `http_server::do_accept_one` (often the earliest
“Scylla fiber” to appear).
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_task()").stdout
match = re.search(r"task=(\d+)", result)
task = match.group(1) if match else None
return task
@pytest.fixture(scope="module")
def coroutine_task(gdb_cmd, scylla_server):
"""
Finds a coroutine task, similar to the `task` fixture.
This fixture executes the `coroutine_config` script in GDB to locate a
specific coroutine task.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_coroutine()").stdout
match = re.search(r"coroutine_config=\s*(.*)", result)
if not match:
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
return match.group(1).strip()
def test_coroutine_frame(gdb_cmd, coroutine_task):
"""
Offsets the pointer by two words to shift from the outer coroutine frame
to the inner `seastar::task`, as required by `$coro_frame`, which expects
a `seastar::task*`.
"""
assert coroutine_task, "No coroutine task was found"
result = execute_gdb_command(
gdb_cmd, full_command="p *$coro_frame($get_coroutine() + 16)"
gdb_cmd, full_command=f"p *$coro_frame({coroutine_task} + 16)"
)
if "COROUTINE_NOT_FOUND" in result.stdout:
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
assert result.returncode == 0, (
f"GDB command `coro_frame` failed. stdout: {result.stdout} stderr: {result.stderr}"
)
def test_fiber(gdb_cmd):
result = execute_gdb_command(gdb_cmd, "fiber $get_task()")
def test_fiber(gdb_cmd, task):
assert task, f"No task was found using `find_vptrs()`"
result = execute_gdb_command(gdb_cmd, f"fiber {task}")
assert result.returncode == 0, (
f"GDB command `fiber` failed. stdout: {result.stdout} stderr: {result.stderr}"
)

View File

@@ -10,7 +10,6 @@
#include <seastar/core/seastar.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/closeable.hh>
#include "init.hh"
#include "supervisor.hh"
#include "directories.hh"