Compare commits

...

19 Commits

Author SHA1 Message Date
Benny Halevy
211eb7c32b test/cluster/dtest: wide_rows_test.py: use prepared statements
Replace string-formatted CQL queries in loops with prepared
statements and bind parameters. This avoids repeated query parsing
on the server side and eliminates CQL injection risk from string
interpolation.

Functions converted:
- test_column_index_stress: INSERT (100k iterations) and SELECT (10k)
- create_large_partition_data: UPDATE with TIMESTAMP
- create_large_row_data: UPDATE per column
- create_too_many_rows_data: UPDATE for columns and collections
- delete_too_many_rows_data: DELETE for columns and collections
- create_large_row_static_data: INSERT
- set_ttl_on_few_rows_in_partition: SELECT and UPDATE with TTL
- set_ttl_on_few_large_rows: SELECT and UPDATE with TTL
2026-04-28 11:40:16 +03:00
Benny Halevy
3a640a9ff0 test/cluster/dtest: wide_rows_test.py: randomize compaction strategy
Replace parametrized compaction strategy (4 strategies × 31 tests =
124 test cases) with random selection per test. This reduces the
test count to 31 while still covering all strategies over time.

Add --compaction-strategy option to allow reproducing failures with
a specific strategy, e.g.:
  ./test.py --mode=dev test/cluster/dtest/wide_rows_test.py \
    --pytest-arg="--compaction-strategy=LeveledCompactionStrategy"
2026-04-28 11:40:16 +03:00
Benny Halevy
8288f87beb test/cluster/dtest: wide_rows_test.py: reduce TTL sleep time
Reduce TTL from 60 to 1 second and sleep time from ttl+5 to ttl+1
in set_ttl_on_few_rows_in_partition() and set_ttl_on_few_large_rows().
The original 60-second TTL was unnecessarily high, adding over a
minute of idle wait time per TTL test invocation.
2026-04-28 11:40:16 +03:00
Benny Halevy
77c03354f4 test/cluster/dtest: wide_rows_test.py: fix key_appearance accumulation
In validate_entities_recognized_as_large(), key_appearance was
overwritten on each loop iteration instead of being accumulated.
This meant that for entity_type == "cell" in multi-node clusters,
entities_count only reflected the last node's count rather than
the total across all nodes. Fix by using += to accumulate.

Update expected_entity_number in test_large_cell_in_materialized_view
to account for RF=3 replication (each cell appears on all 3 nodes).

Bug inherited from scylla-dtest.
2026-04-28 11:40:16 +03:00
Benny Halevy
49df9242f7 test/cluster/dtest: wide_rows_test.py: scope compact to test keyspace/table
Pass KEYSPACE_NAME and TABLE_NAME to cluster.compact() instead of
compacting all keyspaces. This avoids unnecessary compaction of
system tables, making tests faster.

Also convert remaining nodetool("compact ...") calls to use
cluster.compact() for consistency.
2026-04-28 11:40:16 +03:00
Benny Halevy
8c82c6646b test/cluster/dtest: wide_rows_test.py: fix expect_warning mutation across nodes
In validate_log_warnings(), expect_warning was reassigned inside the
per-node loop, so if the first node set it to False (due to no
sstables on disk), all subsequent nodes would inherit that value
regardless of their own state.

Use a local variable (node_expect_warning) instead of mutating the
function parameter.
2026-04-28 11:40:16 +03:00
Benny Halevy
d76d0b8a16 test/cluster/dtest: wide_rows_test.py: remove dead code
Remove validation_small_entity() and get_large_entity_info() methods.
These are not called by any test in the migrated file.
get_large_entity_info() also had a bug where the CQL query used
escaped braces ({{keyspace_name}}) instead of actual parameter
substitution, so it would have queried for literal '{keyspace_name}'.
2026-04-28 11:40:16 +03:00
Benny Halevy
690672e4cb test/cluster/dtest: wide_rows_test.py: cosmetic cleanups
Fix typos: aproximately, quering, colection, table_nam, the the.
Fix grammar: 'verify the they didn't recognized as large'.
Use idiomatic 'not in' instead of 'not x in'.
Remove unused variable assignment and commented-out debug line.
Remove unnecessary f-string prefix.
Fix '/n' to use actual newline in error message formatting.
Fix extra trailing quotes in exception messages.
Remove redundant variable assignment (maximum_primary_key_value).
2026-04-28 11:40:13 +03:00
Benny Halevy
85079d7c7a test/cluster/dtest: migrate wide_rows_test.py from scylla-dtest
Adapt wide_rows_test.py to work with the in-tree cluster test
framework:
- Replace dtest imports with in-tree equivalents
- Replace self.cluster.flush() + self.cluster.wait_for_compactions()
  with self.cluster.compact() since nodetool compact handles flush
  and waiting internally
- Add inline wait_for_view() helper (replaces async version)
- Replace node.status with is_running() check
- Add copyright header

Remove from skip_in_dev now that all tests pass.
2026-04-28 11:39:47 +03:00
Benny Halevy
70f8fcbe67 test/cluster/dtest: cache ScyllaNode hostid
Cache the host ID in ScyllaNode._hostid so that hostid() returns
the cached value when the node is stopped.  Without this,
watch_log_for_death() fails with a timeout because it tries to
query the stopped node's API to get its host ID for the log
pattern match.
2026-04-28 11:36:08 +03:00
Benny Halevy
1d6403ddad test/cluster/dtest: add ScyllaCluster.compact() method
Add compact() method to ScyllaCluster, delegating to
ScyllaNode.compact() on each running node. Accepts optional
keyspace and tables parameters to allow scoping compaction to
specific keyspaces/tables.

Also fix ScyllaNode.compact() to use list[str] for tables
parameter and extend() instead of +=, so that passing a single
table name as a string does not iterate over its characters.
2026-04-28 11:36:08 +03:00
Benny Halevy
9a24be2fe9 test/cluster/dtest: add assertion helpers for wide_rows_test
Add assert_equal_more_with_deviation() and assert_less_equal_lists()
to tools/assertions.py.  These are needed by the wide_rows_test.py
migration from scylla-dtest.
2026-04-28 11:36:08 +03:00
Benny Halevy
5c93ccb6d8 test/cluster/dtest: copy wide_rows_test.py verbatim from scylla-dtest
Copy wide_rows_test.py as-is from scylla-dtest. The test is added
to run_in_dev but also skip_in_dev in test_config.yaml since it
requires functional changes to work with the in-tree test
framework. The next commit will make the necessary changes and
remove it from skip_in_dev.
2026-04-28 11:36:08 +03:00
Anna Mikhlin
86472e43e1 Update ScyllaDB version to: 2026.3.0-dev 2026-04-26 15:30:13 +03:00
Piotr Szymaniak
d5efd1f676 test/cluster: wait for Alternator readiness in server startup
server_add() only waits for CQL readiness before returning. The
Alternator HTTP port may not be listening yet, causing
ConnectionRefused with Alternator tests.

Extend the ServerUpState enum and startup loop to also check Alternator
port readiness when configured. Whenever Alternator port(s) is/are
configured, each is verified if connectable and queryable,
similar to how CQL ports are probed.

Fixes SCYLLADB-1701

Closes scylladb/scylladb#29625
2026-04-25 16:35:44 +03:00
Piotr Smaron
d14d07a079 test: fix flaky test_sstable_write_large_{row,cell} by using a fixed partition key
Commit ce00d61917 ("db: implement large_data virtual tables with feature
flag gating") changed these two tests to construct their mutation with
a randomly generated partition key (simple_schema::make_pkey()) instead
of the previously fixed pk "pv", with the comment that this avoids a
"Failed to generate sharding metadata" error.

simple_schema::make_pkey() delegates to tests::generate_partition_key(),
which defaults to key_size{1, 128}, i.e. the partition key length is
uniformly random in [1, 128] bytes. That interacts badly with the fact
that both tests pick thresholds at exact byte boundaries of the MC
sstable row encoding:

  - The large-data handler records a row's size as
      _data_writer->offset() - current_pos
    (sstables/mx/writer.cc: collect_row_stats()), i.e. the number of
    bytes the row took on disk.
  - For the first clustering row, the body includes a vint-encoded
    prev_row_size = pos - _prev_row_start.
  - _prev_row_start is captured at the start of the partition
    (consume_new_partition()) before the partition key is written to
    the data stream, so prev_row_size rolls in the partition key's
    serialized length (2-byte prefix + pk bytes) + deletion_time +
    static row size.

A random-size partition key therefore perturbs the first clustering
row's encoded size by 1-2 bytes across runs (the vint of prev_row_size
crosses the 128 boundary), flipping the test's byte-exact threshold
comparison. On seed 2104744000 this produced:

  critical check row_size_count == expected.size() has failed [3 != 2]

Fix the two byte-exact-sensitive tests by reverting their partition key
to the fixed s.new_mutation("pv") used before ce00d61917. Under smp=1
(which these tests run with, per -c1 in the test invocation) a fixed
key is always shard-local, so no sharding-metadata issue arises here.

The other tests modified by ce00d61917 (test_sstable_log_too_many_rows,
test_sstable_log_too_many_dead_rows, test_sstable_too_many_collection_elements,
test_large_data_records_round_trip, etc.) assert on row/element counts
or use thresholds with enough slack that the partition key size does
not matter, and are left unchanged.

Add an explanatory comment to each fixed site so the pitfall is not
re-introduced by a future refactor.

Verified stable with:
  ./test.py --mode=dev     test/boost/sstable_3_x_test.cc::test_sstable_write_large_row  --repeat 100 --max-failures 1
  ./test.py --mode=dev     test/boost/sstable_3_x_test.cc::test_sstable_write_large_cell --repeat 100 --max-failures 1
  ./test.py --mode=release test/boost/sstable_3_x_test.cc::test_sstable_write_large_row  --repeat 100 --max-failures 1
  ./test.py --mode=release test/boost/sstable_3_x_test.cc::test_sstable_write_large_cell --repeat 100 --max-failures 1

All four invocations: 100/100 passed.

Fixes: SCYLLADB-1685

Closes scylladb/scylladb#29621
2026-04-25 16:32:02 +03:00
Botond Dénes
70261dc674 Merge 'test/cluster: scale failure_detector_timeout_in_ms by build mode' from Marcin Maliszkiewicz
The failure_detector_timeout_in_ms override of 2000ms in 6 cluster test files is too aggressive for debug/sanitize builds. During node joins, the coordinator's failure detector times out on RPC pings to the joining node while it is still applying schema snapshots, marks it DOWN, and bans it — causing flaky test failures.

Scale the timeout by MODES_TIMEOUT_FACTOR (3x for debug/sanitize, 2x for dev, 1x for release) via a shared failure_detector_timeout fixture in conftest.py.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1587
Backport: no, elasticsearch analyser shows only a single failure

Closes scylladb/scylladb#29522

* github.com:scylladb/scylladb:
  test/cluster: scale failure_detector_timeout_in_ms by build mode
  test/cluster: add failure_detector_timeout fixture
2026-04-24 09:10:43 +03:00
Marcin Maliszkiewicz
e414b2b0b9 test/cluster: scale failure_detector_timeout_in_ms by build mode
Six cluster test files override failure_detector_timeout_in_ms to 2000ms
for faster failure detection. In debug and sanitize builds, this causes
flaky node join failures. The following log analysis shows how.

The coordinator (server 614, IP 127.2.115.3) accepts the joining node
(server 615, host_id 53b01f0b, IP 127.2.115.2) into group0:

  20:10:57,049 [shard 0] raft_group0 - server 614 entered
    'join group0' transition state for 53b01f0b

The joining node begins receiving the raft snapshot 100ms later:

  20:10:57,150 [shard 0] raft_group0 - transfer snapshot from 9fa48539

It then spends ~280ms applying schema changes -- creating 6 keyspaces
and 12+ tables from the snapshot:

  20:10:57,511 [shard 0] migration_manager - Creating keyspace
    system_auth_v2
  ...
  20:10:57,788 [shard 0] migration_manager - Creating
    system_auth_v2.role_members

Meanwhile, the coordinator's failure detector pings the joining node.
Under debug+ASan load the RPC call times out after ~4.6 seconds:

  20:11:01,643 [shard 0] direct_failure_detector - unexpected exception
    when pinging 53b01f0b: seastar::rpc::timeout_error
    (rpc call timed out)

25ms later, the coordinator marks the joining node DOWN and removes it:

  20:11:01,668 [shard 0] raft_group0 - failure_detector_loop:
    Mark node 53b01f0b as DOWN
  20:11:01,717 [shard 0] raft_group0 - bootstrap: failed to accept
    53b01f0b

The joining node was still retrying the snapshot transfer at that point:

  20:11:01,745 [shard 0] raft_group0 - transfer snapshot from 9fa48539

It then receives the ban notification and aborts:

  20:11:01,844 [shard 0] raft_group0 - received notification of being
    banned from the cluster

Replace the hardcoded 2000ms with the failure_detector_timeout fixture
from conftest.py, which scales by MODES_TIMEOUT_FACTOR: 3x for
debug/sanitize (6000ms), 2x for dev (4000ms), 1x for release (2000ms).

Test measurements (before -> after fix):

  debug mode:
  test_replace_with_same_ip_twice           24.02s ->  25.02s
  test_banned_node_notification            217.22s -> 221.72s
  test_kill_coordinator_during_op          116.11s -> 127.13s
  test_node_failure_during_tablet_migration
    [streaming-source]                     183.25s -> 192.69s
  test_replace (4 tests)        skipped in debug (skip_in_debug)
  test_raft_replace_ignore_nodes  skipped in debug (run_in_dev only)

  dev mode:
  test_replace_different_ip                 10.51s ->  11.50s
  test_replace_different_ip_using_host_id   10.01s ->  12.01s
  test_replace_reuse_ip                     10.51s ->  12.03s
  test_replace_reuse_ip_using_host_id       13.01s ->  12.01s
  test_raft_replace_ignore_nodes            19.52s ->  19.52s
2026-04-20 15:28:34 +02:00
Marcin Maliszkiewicz
99ac36b353 test/cluster: add failure_detector_timeout fixture
Add a shared pytest fixture that scales the failure detector timeout
by build mode factor (e.g. 3x for debug/sanitize, 2x for dev).
2026-04-20 15:28:33 +02:00
18 changed files with 1634 additions and 52 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.2.0-dev
VERSION=2026.3.0-dev
if test -f version
then

View File

@@ -5171,8 +5171,16 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
// Use a fixed partition key. The row-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5244,8 +5252,16 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5264,7 +5280,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
for (auto idx = 0; idx < rows - 1; idx++) {
@@ -5326,7 +5341,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
int live_rows = 0;
@@ -5436,7 +5450,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
std::map<bytes, bytes> kv_map;
for (auto i = 0; i < elements; i++) {
@@ -5512,7 +5525,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
// Create a mutation with a clustering row whose serialized cell value
// exceeds the 1-byte thresholds, so partition_size, row_size, and
// cell_size records are all generated.
// Use make_pkey() (no argument) to generate a key on this shard.
auto pk = ss.make_pkey();
mutation m(s, pk);
auto ck = ss.make_ckey("ck1");
@@ -5622,7 +5634,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
// Create 6 partitions, each with one row of increasing size.
// Since each partition has exactly one row, we get 6 row_size records
// competing for 3 slots.
// Use make_pkeys() to generate shard-local keys.
auto pkeys = ss.make_pkeys(6);
utils::chunked_vector<mutation> muts;
for (int i = 0; i < 6; i++) {

View File

@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, path_to
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
from test.pylib.random_tables import RandomTables
from test.pylib.skip_types import skip_env
from test.pylib.util import unique_name
@@ -394,3 +394,8 @@ async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res
@pytest.fixture(scope="function")
def failure_detector_timeout(build_mode):
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]

View File

@@ -227,6 +227,11 @@ class ScyllaCluster:
def flush(self) -> None:
self.nodetool("flush")
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
for node in self.nodelist():
if node.is_running():
node.compact(keyspace=keyspace, tables=tables)
@staticmethod
def debug(message: str) -> None:
logger.debug(message)

View File

@@ -111,6 +111,7 @@ class ScyllaNode:
self.data_center = server.datacenter
self.rack = server.rack
self._hostid = None
self._smp_set_during_test = None
self._smp = None
self._memory = None
@@ -465,6 +466,9 @@ class ScyllaNode:
if wait_for_binary_proto:
self.wait_for_binary_interface(from_mark=self.mark)
if not self._hostid:
self.hostid()
if wait_other_notice:
timeout = self.cluster.default_wait_other_notice_timeout
for node, mark in marks:
@@ -647,11 +651,12 @@ class ScyllaNode:
cmd.append(table)
self.nodetool(" ".join(cmd), **kwargs)
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
compact_cmd = ["compact"]
if keyspace:
compact_cmd.append(keyspace)
compact_cmd += tables
if tables:
compact_cmd.extend(tables)
self.nodetool(" ".join(compact_cmd))
def drain(self, block_on_log: bool = False) -> None:
@@ -824,10 +829,13 @@ class ScyllaNode:
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
try:
return self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
if not self._hostid:
try:
self._hostid = self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
return self._hostid
def rmtree(self, path: str | Path) -> None:
"""Delete a directory content without removing the directory.

View File

@@ -34,6 +34,7 @@ def pytest_addoption(parser: Parser) -> None:
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None)
parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)")
parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster")
parser.addoption("--compaction-strategy", action="store", default=None, help="Compaction strategy to use in tests that support it (e.g. wide_rows_test.py). One of LeveledCompactionStrategy, SizeTieredCompactionStrategy, TimeWindowCompactionStrategy, or IncrementalCompactionStrategy. If not set, a random strategy is chosen per test.")
def pytest_configure(config: Config) -> None:

View File

@@ -263,3 +263,25 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key]))
assert sorted_list1 == sorted_list2
def assert_equal_more_with_deviation(actual, expect, deviation_perc):
"""
Assert actual is within inclusive interval [expected...expected+deviation_perc]
@param actual Value inspected
@param expect Beginning of expected interval
@param deviation_perc allowed percent increase
"""
deviation_high = (expect * (100 + deviation_perc)) / 100
assert expect <= actual <= deviation_high, f"Expect result interval {expect}..{deviation_high}, received {actual}"
def assert_less_equal_lists(actual_list, expected_list, msg=None):
"""
Assert actual_list is a subset of the expected list, prints hardcoded or parameterized error message
@param actual_list Inspected list
@param expected_list List that supposed to include actual_list
@param msg Configured message default None.
"""
standardMsg = msg or f"{actual_list} not less than or equal to {expected_list}"
assert set(actual_list) <= set(expected_list), standardMsg

File diff suppressed because it is too large Load Diff

View File

@@ -48,5 +48,6 @@ run_in_dev:
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test
- dtest/wide_rows_test
run_in_debug:
- random_failures/test_random_failures

View File

@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': 2000
'failure_detector_timeout_in_ms': failure_detector_timeout
}
cmdline = [
'--logger-log-level', 'raft_topology=trace',

View File

@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.nightly
async def test_banned_node_notification(manager: ManagerClient) -> None:
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
"""Test that a node banned from the cluster get notification about been banned"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': 2000
'failure_detector_timeout_in_ms': failure_detector_timeout
}
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
cql = manager.get_cql()

View File

@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
@pytest.mark.asyncio
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace 3 dead nodes.
This is a slow test with a 7 node cluster and 3 replace operations,
we want to run it only in dev mode.
"""
logger.info("Booting initial cluster")
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
s1_id = await manager.get_host_id(servers[1].server_id)
s2_id = await manager.get_host_id(servers[2].server_id)

View File

@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient) -> None:
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient) -> None:
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)

View File

@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
logger.info("starting a cluster with two nodes")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
logger.info(f"cluster started {servers}")
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:

View File

@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
if fail_stage == 'cleanup' and fail_replica == 'destination':
skip_env('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
skip_env('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
host_ids = []
servers = []

View File

@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
class ServerUpState(IntEnum):
PROCESS_STARTED = auto()
HOST_ID_QUERIED = auto()
CQL_CONNECTED = auto()
CQL_QUERIED = auto()
CQL_ALTERNATOR_CONNECTED = auto()
CQL_ALTERNATOR_QUERIED = auto()
SERVING = auto() # Scylla sent sd_notify("serving")

View File

@@ -389,7 +389,7 @@ class ManagerClient:
seeds: list[IPAddress] | None = None,
timeout: float | None = None,
connect_driver: bool = True,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -540,7 +540,7 @@ class ManagerClient:
seeds: Optional[List[IPAddress]] = None,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
server_encryption: str = "none",
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
connect_driver: bool = True) -> ServerInfo:
"""Add a new server"""
if expected_error is not None:

View File

@@ -41,6 +41,7 @@ import yaml
import signal
import glob
import errno
import json
import re
import platform
import contextlib
@@ -557,7 +558,7 @@ class ScyllaServer:
async def install_and_start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
"""Setup and start this server."""
await self.install()
@@ -677,10 +678,63 @@ class ScyllaServer:
return None
return maintenance_socket_option
async def get_cql_up_state(self) -> ServerUpState | None:
"""Get the CQL up state (a check we use at start up).
def _alternator_ports(self) -> list[tuple[str, int]]:
"""Return (scheme, port) for every configured Alternator port."""
ports = []
if "alternator_port" in self.config:
ports.append(("http", self.config["alternator_port"]))
if "alternator_https_port" in self.config:
ports.append(("https", self.config["alternator_https_port"]))
return ports
Return None if it fails to connect.
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
"""TCP connect to every configured Alternator port.
Returns True if all ports accept connections.
"""
for _, port in ports:
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(self.ip_addr, port), timeout=2)
writer.close()
await writer.wait_closed()
except (OSError, asyncio.TimeoutError):
return False
return True
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
"""Sends a GetItem for a randomly-named nonexistent table and validates
that the response is a DynamoDB-shaped JSON error (contains __type),
confirming Alternator is processing DynamoDB API requests.
Returns True if all ports respond correctly.
"""
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
headers = {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.GetItem",
}
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
for scheme, port in ports:
url = f"{scheme}://{self.ip_addr}:{port}/"
try:
# ssl=False skips certificate verification
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
response_body = await resp.json(content_type=None)
if "__type" not in response_body:
return False
except Exception as exc:
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
return False
return True
async def get_cql_up_state(self) -> tuple[bool, bool]:
"""Check CQL connectivity.
Returns (connected, queried) indicating whether a CQL connection
was established and whether a query executed successfully.
"""
caslog = logging.getLogger('cassandra')
oldlevel = caslog.getEffectiveLevel()
@@ -708,6 +762,7 @@ class ScyllaServer:
request_timeout=self.TOPOLOGY_TIMEOUT)
contact_points=[self.rpc_address]
connected = False
cql_queried = False
try:
# In a cluster setup, it's possible that the CQL
# here is directed to a node different from the initial contact
@@ -730,13 +785,32 @@ class ScyllaServer:
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
auth_provider=self.auth_provider)
self.control_connection = self.control_cluster.connect()
return ServerUpState.CQL_QUERIED
cql_queried = True
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
self.logger.debug("Exception when checking if CQL is up: %s", exc)
return ServerUpState.CQL_CONNECTED if connected else None
finally:
caslog.setLevel(oldlevel)
# Any other exception may indicate a problem, and is passed to the caller.
return connected, cql_queried
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
connected = await self.check_alternator_connected(ports)
queried = connected and await self.check_alternator_queried(ports)
return connected, queried
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
"""Get the combined CQL + Alternator up state."""
cql_connected, cql_queried = await self.get_cql_up_state()
alt_connected, alt_queried = False, False
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
if alt_ports:
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
if cql_queried and (alt_queried or not alt_ports):
return ServerUpState.CQL_ALTERNATOR_QUERIED
if not cql_connected or (alt_ports and not alt_connected):
return None
# Here both CQL and Alternator (if exists) are at least connected
return ServerUpState.CQL_ALTERNATOR_CONNECTED
def _setup_notify_socket(self) -> None:
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
@@ -824,7 +898,7 @@ class ScyllaServer:
async def start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None) -> None:
"""Start an installed server.
@@ -897,9 +971,9 @@ class ScyllaServer:
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
server_up_state = await self.get_cql_up_state() or server_up_state
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
# Check for SERVING state (sd_notify "serving" message)
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
server_up_state = ServerUpState.SERVING
if server_up_state >= expected_server_up_state:
if expected_error is not None:
@@ -1194,7 +1268,7 @@ class ScyllaCluster:
seeds: Optional[List[IPAddress]] = None,
server_encryption: str = "none",
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
"""Add a new server to the cluster"""
self.is_dirty = True
@@ -1434,7 +1508,7 @@ class ScyllaCluster:
server_id: ServerNum,
expected_error: str | None = None,
seeds: list[IPAddress] | None = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -1917,7 +1991,7 @@ class ScyllaClusterManager:
server_id=server_id,
expected_error=data.get("expected_error"),
seeds=data.get("seeds"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
cmdline_options_override=data.get("cmdline_options_override"),
append_env_override=data.get("append_env_override"),
auth_provider=data.get("auth_provider"),
@@ -1952,7 +2026,7 @@ class ScyllaClusterManager:
seeds=data.get("seeds"),
server_encryption=data.get("server_encryption", "none"),
expected_error=data.get("expected_error"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
)
return s_info.as_dict()