Compare commits

...

22 Commits

Author SHA1 Message Date
Botond Dénes
809f12f988 Merge 'test/cluster/dtest: fix ScyllaNode state not persisting across nodelist() calls' from Benny Halevy
`ScyllaCluster.nodelist()` creates new `ScyllaNode` objects on every call,
so per-node state set via `set_smp()`, `set_log_level()`, and
`_adjust_smp_and_memory()` was lost. This meant `set_smp()` had no effect
when `cluster.start()` was called after it, since `start_nodes()` calls
`nodelist()` internally which creates fresh nodes with default values.

- Add debug logging for smp/memory in ScyllaNode
- Store per-node settings (smp, memory, log levels) in a
  `ScyllaCluster._node_resources` dict keyed by server_id, so they survive
  `nodelist()` reconstruction. `ScyllaNode` restores its state from this dict
  on construction and saves it back whenever `set_smp()`, `set_log_level()`,
  or `_adjust_smp_and_memory()` modifies it.
- Add a reproducer test verifying `set_smp()` takes effect on restart

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1629

--

No backport needed: this only fixes dtest infrastructure, no production code
is affected.

Closes scylladb/scylladb#29549

* github.com:scylladb/scylladb:
  test/cluster/dtest: add test for node.set_smp() persistence
  test/cluster/dtest: cache ScyllaNode instances in ScyllaCluster
  test/cluster/dtest/ccmlib/scylla_node: add debug logging
2026-04-29 06:25:36 +03:00
Avi Kivity
c4de2b3c9d Merge 'test: fix flaky tablets test by using read barrier' from Aleksandra Martyniuk
Some tests in test_tablets.py read system_schema.keyspaces from an arbitrary node that may not have applied the latest schema change yet. Pin the read to a specific node and issue a read barrier before querying, ensuring the node has up-to-date data.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1700

Test fix; no backport

Closes scylladb/scylladb#29655

* github.com:scylladb/scylladb:
  test: fix flaky rack list conversion tests by using read barrier
  test: fix flaky test_enforce_rack_list_option by using read barrier
2026-04-28 17:15:59 +03:00
Patryk Jędrzejczak
d9dd3bfe53 Merge 'topology_coordinator: join tablet load stats refresh in stop()' from Andrzej Jackowski
Commit 2b7aa32 (topology_coordinator: Refresh load stats after
table is created or altered) registered topology_coordinator as a
schema change listener and added on_create_column_family which
fire-and-forgets _tablet_load_stats_refresh.trigger(). The
triggered task runs on the gossip scheduling group via
with_scheduling_group and accesses the topology_coordinator via
'this'.

stop() unregisters the listener but does not wait for any
in-flight refresh task. If a notification fires between
_tablet_load_stats_refresh.join() in run() and unregister_listener
in stop(), the scheduled task can outlive the topology_coordinator
and access freed memory after run_topology_coordinator's coroutine
frame is destroyed.

Wait for the refresh to complete in stop() after unregistering the
listener, ensuring no task can fire after destruction.

Fixes SCYLLADB-1728

Backport to 2026.1 and 2026.2, because the issue was introduced in 2b7aa32

Closes scylladb/scylladb#29653

* https://github.com/scylladb/scylladb:
  test: tablet_stats: reproduce shutdown refresh race
  topology_coordinator: join tablet load stats refresh in stop()
2026-04-28 12:54:28 +02:00
Benny Halevy
5eaa979f35 test/cluster/dtest: add test for node.set_smp() persistence
Add a test that reproduces SCYLLADB-1629: set_smp() had no effect
because nodelist() created new ScyllaNode objects on every call,
losing the _smp_set_during_test value. The test fails without the
fix in the previous patch.
2026-04-28 12:34:08 +03:00
Benny Halevy
7430c1efd7 test/cluster/dtest: cache ScyllaNode instances in ScyllaCluster
ScyllaCluster.nodelist() was creating new ScyllaNode objects on every
call, so per-node state set via set_smp(), set_log_level(), and
_adjust_smp_and_memory() was lost between calls.

Fix by caching ScyllaNode instances in a list populated by
_add_nodes() using the list returned by servers_add() in populate().
Nodes are assigned monotonically increasing names (node1, node2, ...).
nodelist() simply returns the cached list.
2026-04-28 12:34:06 +03:00
Marcin Maliszkiewicz
b0f988afc4 Merge 'auth: fix shutdown and startup races in LDAP cache pruner' from Andrzej Jackowski
The LDAP role manager's `_cache_pruner` background fiber periodically calls cache::reload_all_permissions(). Two races cause it to hit SCYLLA_ASSERT(_permission_loader):
- Cross-shard race: The pruner `used _cache.container().invoke_on_all()` to reload permissions on every shard. Since both `service::start()` and `sharded<service>::stop()` execute per-shard in parallel, the pruner on one shard could call reload_all_permissions() on another shard before that shard set its loader (startup) or after it cleared its loader (shutdown). Each shard runs its own pruner instance, so reloading locally is sufficient — this also removes redundant N² reload calls.
- Intra-shard race: `service::stop()` cleared the permission loader and stopped the role manager concurrently (via when_all_succeed). A mid-reload pruner could yield and then call the now-null loader. Fixed by stopping the role manager first so the pruner is fully drained before the loader is cleared.

Fixes SCYLLADB-1679
Backport to 2026.2, introduced in 7eedf50c12

Closes scylladb/scylladb#29605

* github.com:scylladb/scylladb:
  auth: make shutdown the exact reverse of startup
  test: ldap: add test for pruner crash during shutdown
  auth: start authorizer and set permission loader before role manager
  auth: stop role manager before clearing permission loader
  auth: reload LDAP permission cache on local shard only
2026-04-28 11:16:07 +02:00
Botond Dénes
a7e9c0e6d2 Merge 'test.py: fix test collection bug' from Andrei Chekun
In certain circumstances current way of collecting can be error-prone. Collection can stop when the first file is skipped in the mode leaving the rest of the files in CLI not collected.
Another issue that if the file specified twice, with directory and file explicitly, it will produce incorrect CppFile in the stash causing KeyError.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1714

No backport, test framework bug fix only.

Closes scylladb/scylladb#29634

* github.com:scylladb/scylladb:
  test.py: fix framework test
  test.py: fix test collection bug
2026-04-28 11:52:35 +03:00
Botond Dénes
3ea4af1c8c Merge 'test/cluster/test_incremental_repair: fix flaky coordinator-change scenario' from Avi Kivity
- Ensure servers[1] is not the topology coordinator before restarting it, preventing the leader death + re-election + re-repair sequence that masked the compaction-merge bug
- Add a retry loop that detects post-restart leadership transfer to servers[1] via direct coordinator query, retrying up to 5 times

Fixes: SCYLLADB-1478

Backporting to 2026.2, which sees the failure regularly.

Closes scylladb/scylladb#29671

* github.com:scylladb/scylladb:
  test/cluster/test_incremental_repair: add retry for residual leadership race
  test/cluster/test_incremental_repair: fix flaky coordinator-change scenario
2026-04-28 09:05:02 +03:00
Andrzej Jackowski
459e3970cd test: tablet_stats: reproduce shutdown refresh race
The coordinator can receive a schema-change notification after run()
finishes but before stop() unregisters listeners. The test pins that
window with error injections and verifies stop() waits for the refresh
instead of letting it outlive the coordinator.

Test time in dev: 9.51s

Refs SCYLLADB-1728
2026-04-28 08:00:54 +02:00
Andrzej Jackowski
8756f7c068 topology_coordinator: join tablet load stats refresh in stop()
Commit 2b7aa3211d made schema changes trigger tablet load stats
refreshes in the background. A notification can still arrive after
run() stops the periodic refresher and before the coordinator object
is destroyed.

Move lifecycle subscription cleanup to stop() and join the serialized
refresh there after unregistering refresh trigger sources. This keeps
the coordinator alive until notification-triggered refresh work has
completed.

Fixes SCYLLADB-1728
2026-04-28 07:37:28 +02:00
Avi Kivity
2615d0e8d8 test/cluster/test_incremental_repair: add retry for residual leadership race
There is a small race window where Raft leadership could transfer back
to servers[1] between the ensure_group0_leader_on() check and the
actual restart.  If this happens, the new coordinator re-initiates
repair and masks the compaction-merge bug.

Extract the core test logic into _do_race_window_promotes_unrepaired_data()
which directly checks get_topology_coordinator() after restart and raises
_LeadershipTransferred if servers[1] became coordinator.  The test
function calls this helper in a retry loop (up to 5 attempts).

Refs: SCYLLADB-1478
2026-04-27 21:11:06 +03:00
Avi Kivity
914b70c75b test/cluster/test_incremental_repair: fix flaky coordinator-change scenario
The test_incremental_repair_race_window_promotes_unrepaired_data test
was flaky because it hardcodes servers[1] as the restart target but did
not ensure servers[1] was NOT the topology coordinator.

When servers[1] happened to be the Raft group0 leader (topology
coordinator), restarting it killed the leader, forced a new election,
and the new coordinator re-initiated tablet repair.  This re-repair
flushes memtables on all replicas via take_storage_snapshot() and marks
the resulting sstables as repaired -- causing post-repair keys to appear
in repaired sstables on servers[0] and servers[2].  The test then hit
the wrong assertion (servers[0]/[2] contaminated).

Fix: before starting the repair, check whether servers[1] is the
topology coordinator.  If so, move leadership to another server via
ensure_group0_leader_on() so that restarting servers[1] only kills a
follower -- which does not trigger an election or coordinator change.

Reproducibility was confirmed by forcing leadership to servers[1] via
ensure_group0_leader_on() and observing deterministic failure with all
three servers showing post-repair keys in repaired sstables (confirming
the re-repair scenario), then verifying the fix passes reliably.

Fixes: SCYLLADB-1478
2026-04-27 21:08:12 +03:00
Aleksandra Martyniuk
6b7ce5e244 test: fix flaky rack list conversion tests by using read barrier
test_numeric_rf_to_rack_list_conversion and
test_numeric_rf_to_rack_list_conversion_abort were reading
system_schema.keyspaces from an arbitrary node that may not have
applied the latest schema change yet. Pin the read to a specific node
and issue a read barrier before querying, ensuring the node has
up-to-date data.
2026-04-27 15:19:09 +02:00
Aleksandra Martyniuk
9d3d424d58 test: fix flaky test_enforce_rack_list_option by using read barrier
The test was reading system_schema.keyspaces from an arbitrary node
that may not have applied the latest schema change yet. Pin the read
to a specific node and issue a read barrier before querying, ensuring
the node has up-to-date data.
2026-04-27 14:44:38 +02:00
Andrei Chekun
f2f4915e09 test.py: fix framework test
Framework test was not skipping unit directory where C++ tests are
located. With bug fixing this started to fail. Add ignoring this
directory as well.
2026-04-25 18:04:55 +02:00
Andrei Chekun
92c09d106d test.py: fix test collection bug
In certain circumstances current way of collecting can be error prone.
Collection can stop when the first file is skipped in the mode leaving
the rest of the files in CLI not collected.
Another issue that if the file specified twice, with directory and file
explicitly, it will produce incorrect CppFile in the stash causing
KeyError.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1714
2026-04-24 17:57:11 +02:00
Andrzej Jackowski
8855e77465 auth: make shutdown the exact reverse of startup
The previous parallel stop of the authenticator and authorizer
was a micro-optimization that obscured the lifecycle invariant
that shutdown should reverse startup.

Refs SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
adf1e26bab test: ldap: add test for pruner crash during shutdown
Verify that service::stop() drains the LDAP pruner before
clearing the permission loader. The test installs a slow
permission loader and confirms the pruner is actively
reloading when teardown begins.

Refs SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
37a547604f auth: start authorizer and set permission loader before role manager
LDAP role manager starts a pruner fiber that calls
reload_all_permissions() which asserts _permission_loader is set.
The permission loader calls _authorizer->authorize(), so the
authorizer must be started before the loader is set.

Start authorizer, then set the permission loader, then start the
role manager, ensuring both dependencies are satisfied before the
pruner can fire.

Fixes SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
c3e5285d45 auth: stop role manager before clearing permission loader
service::stop() cleared the permission loader and stopped
the role manager concurrently (via when_all_succeed). The
LDAP pruner could be mid-reload at a yield point when the
loader was set to null, causing it to call a null function.

Stop the role manager first so the pruner is fully drained
before the loader is cleared.

Fixes SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
f75e5ac65b auth: reload LDAP permission cache on local shard only
The LDAP role manager's _cache_pruner fiber used
invoke_on_all() to reload permissions on every shard.
Since auth::service::start() runs on all shards in
parallel via invoke_on_all(), the pruner on shard X
could call reload_all_permissions() on shard Y before
shard Y finished start() and set its permission loader,
hitting SCYLLA_ASSERT(_permission_loader). The same
cross-shard race existed during shutdown.

Each shard runs its own pruner instance, so reloading
locally is sufficient — all shards are still covered.
This also removes redundant N-squared reload calls.

Refs SCYLLADB-1679
2026-04-24 13:06:58 +02:00
Benny Halevy
6cb4c27f8c test/cluster/dtest/ccmlib/scylla_node: add debug logging
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2026-04-23 09:21:06 +03:00
13 changed files with 347 additions and 94 deletions

View File

@@ -258,13 +258,11 @@ future<> ldap_role_manager::start() {
} catch (const seastar::sleep_aborted&) { } catch (const seastar::sleep_aborted&) {
co_return; // ignore co_return; // ignore
} }
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> { try {
try { co_await _cache.reload_all_permissions();
co_await c.reload_all_permissions(); } catch (...) {
} catch (...) { mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
mylog.warn("Cache reload all permissions failed: {}", std::current_exception()); }
}
});
} }
}); });
return _std_mgr.start(); return _std_mgr.start();

View File

@@ -157,15 +157,12 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
return create_legacy_keyspace_if_missing(mm); return create_legacy_keyspace_if_missing(mm);
}); });
} }
co_await _role_manager->start(); // Authorizer must be started before the permission loader is set,
if (this_shard_id() == 0) { // because the loader calls _authorizer->authorize().
// Role manager and password authenticator have this odd startup // The loader must be set before starting the role manager, because
// mechanism where they asynchronously create the superuser role // LDAP role manager starts a pruner fiber that calls
// in the background. Correct password creation depends on role // reload_all_permissions() which asserts _permission_loader is set.
// creation therefore we need to wait here. co_await _authorizer->start();
co_await _role_manager->ensure_superuser_is_created();
}
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
if (!_used_by_maintenance_socket) { if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has // Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be // different authorizer. We can't mix cached permissions, they could be
@@ -174,12 +171,27 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
&service::get_uncached_permissions, &service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2)); this, std::placeholders::_1, std::placeholders::_2));
} }
co_await _role_manager->start();
if (this_shard_id() == 0) {
// Role manager and password authenticator have this odd startup
// mechanism where they asynchronously create the superuser role
// in the background. Correct password creation depends on role
// creation therefore we need to wait here.
co_await _role_manager->ensure_superuser_is_created();
}
// Authenticator must be started after ensure_superuser_is_created()
// because password_authenticator queries system.roles for the
// superuser entry created by the role manager.
co_await _authenticator->start();
} }
future<> service::stop() { future<> service::stop() {
_as.request_abort(); _as.request_abort();
// Reverse of start() order.
co_await _authenticator->stop();
co_await _role_manager->stop();
_cache.set_permission_loader(nullptr); _cache.set_permission_loader(nullptr);
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result(); co_await _authorizer->stop();
} }
future<> service::ensure_superuser_is_created() { future<> service::ensure_superuser_is_created() {

View File

@@ -4237,6 +4237,7 @@ public:
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker) , _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator") , _async_gate("topology_coordinator")
{ {
_lifecycle_notifier.register_subscriber(this);
_db.get_notifier().register_listener(this); _db.get_notifier().register_listener(this);
// When the delay_cdc_stream_finalization error injection is disabled // When the delay_cdc_stream_finalization error injection is disabled
// (test releases it), wake the topology coordinator so it retries // (test releases it), wake the topology coordinator so it retries
@@ -4400,6 +4401,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
} }
future<> topology_coordinator::refresh_tablet_load_stats() { future<> topology_coordinator::refresh_tablet_load_stats() {
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
auto tm = get_token_metadata_ptr(); auto tm = get_token_metadata_ptr();
locator::load_stats stats; locator::load_stats stats;
@@ -4723,7 +4725,6 @@ future<> topology_coordinator::run() {
co_await _async_gate.close(); co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher); co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher); co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc); co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover); co_await std::move(gossiper_orphan_remover);
@@ -4736,6 +4737,8 @@ future<> topology_coordinator::stop() {
co_await _db.get_notifier().unregister_listener(this); co_await _db.get_notifier().unregister_listener(this);
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization"); utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
_topo_sm.on_tablet_split_ready = nullptr; _topo_sm.on_tablet_split_ready = nullptr;
co_await _lifecycle_notifier.unregister_subscriber(this);
co_await _tablet_load_stats_refresh.join();
// if topology_coordinator::run() is aborted either because we are not a // if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle // leader anymore, or we are shutting down as a leader, we have to handle
@@ -4797,7 +4800,6 @@ future<> run_topology_coordinator(
topology_cmd_rpc_tracker}; topology_cmd_rpc_tracker};
std::exception_ptr ex; std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);
try { try {
rtlogger.info("start topology coordinator fiber"); rtlogger.info("start topology coordinator fiber");
co_await with_scheduling_group(group0.get_scheduling_group(), [&] { co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
@@ -4818,7 +4820,7 @@ future<> run_topology_coordinator(
} }
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex)); on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
} }
co_await lifecycle_notifier.unregister_subscriber(&coordinator); co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
co_await coordinator.stop(); co_await coordinator.stop();
} }

View File

@@ -11,13 +11,11 @@ from typing import TYPE_CHECKING
from cassandra.auth import PlainTextAuthProvider from cassandra.auth import PlainTextAuthProvider
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.cluster.dtest.ccmlib.common import logger from test.cluster.dtest.ccmlib.common import logger
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any from typing import Any
@@ -29,6 +27,10 @@ class ScyllaCluster:
self.manager = manager self.manager = manager
self.scylla_mode = scylla_mode self.scylla_mode = scylla_mode
self._config_options = {} self._config_options = {}
# Cached ScyllaNode instances. Nodes are appended by _add_nodes()
# in the order they are created by servers_add().
self._nodes: list[ScyllaNode] = []
self._next_node_num: int = 1
if self.scylla_mode == "debug": if self.scylla_mode == "debug":
self.default_wait_other_notice_timeout = 600 self.default_wait_other_notice_timeout = 600
@@ -39,19 +41,20 @@ class ScyllaCluster:
self.force_wait_for_cluster_start = force_wait_for_cluster_start self.force_wait_for_cluster_start = force_wait_for_cluster_start
@staticmethod def _add_nodes(self, servers: list) -> None:
def _sorted_nodes(servers: Iterable[ServerInfo]) -> list[ServerInfo]: """Create ScyllaNode instances for the given servers and cache them."""
return sorted(servers, key=lambda s: s.server_id) for server in servers:
name = f"node{self._next_node_num}"
self._next_node_num += 1
self._nodes.append(ScyllaNode(
cluster=self, server=server, name=name))
@property @property
def nodes(self) -> dict[str, ScyllaNode]: def nodes(self) -> dict[str, ScyllaNode]:
return {node.name: node for node in self.nodelist()} return {node.name: node for node in self.nodelist()}
def nodelist(self) -> list[ScyllaNode]: def nodelist(self) -> list[ScyllaNode]:
return [ return list(self._nodes)
ScyllaNode(cluster=self, server=server, name=f"node{n}")
for n, server in enumerate(self._sorted_nodes(self.manager.all_servers()), start=1)
]
def get_node_ip(self, nodeid: int) -> str: def get_node_ip(self, nodeid: int) -> str:
return self.nodelist()[nodeid-1].address() return self.nodelist()[nodeid-1].address()
@@ -61,16 +64,16 @@ class ScyllaCluster:
self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra") self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
match nodes: match nodes:
case int(): case int():
self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1") self._add_nodes(self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1"))
case list(): case list():
for dc, n_nodes in enumerate(nodes, start=1): for dc, n_nodes in enumerate(nodes, start=1):
dc_name = f"dc{dc}" dc_name = f"dc{dc}"
self.manager.servers_add( self._add_nodes(self.manager.servers_add(
servers_num=n_nodes, servers_num=n_nodes,
config=self._config_options, config=self._config_options,
start=False, start=False,
auto_rack_dc=dc_name auto_rack_dc=dc_name
) ))
case dict(): case dict():
# Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}} # Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}}
for dc, dc_nodes in nodes.items(): for dc, dc_nodes in nodes.items():
@@ -79,7 +82,7 @@ class ScyllaCluster:
for rack, rack_nodes in dc_nodes.items(): for rack, rack_nodes in dc_nodes.items():
if not isinstance(rack_nodes, int): if not isinstance(rack_nodes, int):
raise RuntimeError(f"Unsupported topology specification: {nodes}") raise RuntimeError(f"Unsupported topology specification: {nodes}")
self.manager.servers_add( self._add_nodes(self.manager.servers_add(
servers_num=rack_nodes, servers_num=rack_nodes,
config=self._config_options, config=self._config_options,
property_file={ property_file={
@@ -87,7 +90,7 @@ class ScyllaCluster:
"rack": rack, "rack": rack,
}, },
start=False, start=False,
) ))
case _: case _:
raise RuntimeError(f"Unsupported topology specification: {nodes}") raise RuntimeError(f"Unsupported topology specification: {nodes}")

View File

@@ -17,6 +17,7 @@ from itertools import chain
from functools import cached_property from functools import cached_property
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import logging
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
from test.pylib.internal_types import ServerUpState from test.pylib.internal_types import ServerUpState
@@ -28,6 +29,9 @@ if TYPE_CHECKING:
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
logger = logging.getLogger("scylla_node")
NODETOOL_STDERR_IGNORED_PATTERNS = ( NODETOOL_STDERR_IGNORED_PATTERNS = (
re.compile(r"WARNING: debug mode. Not for benchmarking or production"), re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
re.compile( re.compile(
@@ -149,15 +153,20 @@ class ScyllaNode:
return self.cluster.scylla_mode return self.cluster.scylla_mode
def set_smp(self, smp: int) -> None: def set_smp(self, smp: int) -> None:
logger.debug(f"Setting smp: {self=} {smp=}")
self._smp_set_during_test = smp self._smp_set_during_test = smp
def smp(self) -> int: def smp(self) -> int:
logger.debug(f"Getting smp: {self=} _smp_set_during_test={self._smp_set_during_test} _smp={self._smp} {DEFAULT_SMP=}")
return self._smp_set_during_test or self._smp or DEFAULT_SMP return self._smp_set_during_test or self._smp or DEFAULT_SMP
def memory(self) -> int: def memory(self) -> int:
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None: def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
if not memory and not smp:
return
logger.debug(f"Adjusting smp={smp} memory={memory} current_smp={self._smp} current_memory={self._memory}")
if memory: if memory:
self._memory = memory // (smp or self.smp()) * self.smp() self._memory = memory // (smp or self.smp()) * self.smp()
if smp: if smp:
@@ -446,6 +455,8 @@ class ScyllaNode:
self.mark = self.mark_log() self.mark = self.mark_log()
logger.debug(f"Starting server: server_id={self.server_id} {scylla_args=} {scylla_env=}")
self.cluster.manager.server_start( self.cluster.manager.server_start(
server_id=self.server_id, server_id=self.server_id,
seeds=None if self.bootstrap else [self.address()], seeds=None if self.bootstrap else [self.address()],

View File

@@ -0,0 +1,46 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
import logging
import pytest
from dtest_class import Tester
logger = logging.getLogger(__file__)
@pytest.mark.single_node
class TestSetSmp(Tester):
"""Test that node.set_smp() properly persists across restarts."""
def _get_smp_from_log(self, node, from_mark=None):
"""Extract smp value from the node's log by looking at the SHARD_COUNT gossip value."""
matches = node.grep_log(r"SHARD_COUNT : Value\((\d+),\d+\)", from_mark=from_mark)
assert matches, "Could not find SHARD_COUNT in node log"
# Return the last match (most recent start)
return int(matches[-1][1].group(1))
def test_set_smp(self):
"""Verify that set_smp() takes effect on the next start."""
cluster = self.cluster
cluster.populate(1).start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
default_smp = self._get_smp_from_log(node1)
cluster.stop()
# set_smp to a different value and restart without jvm_args
target_smp = 1 if default_smp != 1 else 2
node1.set_smp(target_smp)
mark = node1.mark_log()
cluster.start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
actual_smp = self._get_smp_from_log(node1, from_mark=mark)
assert actual_smp == target_smp, \
f"Expected smp={target_smp} after set_smp({target_smp}), got {actual_smp}"

View File

@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
from test.pylib.util import wait_for_cql_and_get_hosts from test.pylib.util import wait_for_cql_and_get_hosts
from cassandra.query import ConsistencyLevel, SimpleStatement from cassandra.query import ConsistencyLevel, SimpleStatement
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
# affected replica but process the UNREPAIRED sstable on the others, so the classification # affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC # divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection. # on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the class _LeadershipTransferred(Exception):
# UNREPAIRED compaction view, making the race easy to trigger deterministically. """Raised when leadership transferred to servers[1] during the test, requiring a retry."""
await cql.run_async( pass
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs. async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
for s in servers: """Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
Returns the next current_key value.
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
restart, signalling the caller to retry.
"""
# Ensure servers[1] is not the topology coordinator. If the coordinator is
# restarted, the Raft leader dies, a new election occurs, and the new
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
# and marking post-repair data as repaired. That legitimate re-repair masks
# the compaction-merge bug this test detects.
coord = await get_topology_coordinator(manager) coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord) coord_serv = await find_server_by_host_id(manager, servers, coord)
if coord_serv == servers[1]:
other = next(s for s in servers if s != servers[1])
await ensure_group0_leader_on(manager, other)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id) coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark() coord_mark = await coord_log.mark()
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
await manager.server_start(target.server_id) await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers) await manager.servers_see_each_other(servers)
# Check if leadership transferred to servers[1] during the restart.
# If so, the new coordinator will re-initiate repair, masking the bug.
new_coord = await get_topology_coordinator(manager)
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
if new_coord_serv == servers[1]:
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
raise _LeadershipTransferred(
"servers[1] became topology coordinator after restart")
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys, # Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window). # confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60 deadline = time.time() + 60
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
if not compaction_ran: if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; " logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.") "the bug was not triggered. Skipping assertion.")
return return current_key
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair # Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED. # keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, " f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}") f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] flushed post-repair keys after the race window closed, # servers[0] and servers[2] were never restarted and the coordinator stayed
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED. # alive throughout, so no re-repair could have flushed their memtables.
# Post-repair keys must NOT appear in repaired sstables on these servers.
assert not (repaired_keys_0 & post_repair_key_set), \ assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \ f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}" f"got: {repaired_keys_0 & post_repair_key_set}"
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
f"on servers[1] after restart lost the being_repaired markers during the race window. " \ f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \ f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}" f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
return current_key
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
# If leadership transfers to servers[1] between our coordinator check and the
# restart, the coordinator change masks the bug. Detect and retry.
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
current_key = await _do_race_window_promotes_unrepaired_data(
manager, servers, cql, ks, token, scylla_path, current_key)
return
except _LeadershipTransferred as e:
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
"could not run the test without coordinator interference.")
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# Tombstone GC safety tests # Tombstone GC safety tests

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
# #
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, trigger_stepdown from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
import pytest import pytest
import logging import logging
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
coord3 = await get_topology_coordinator(manager) coord3 = await get_topology_coordinator(manager)
if coord3: if coord3:
break break
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
"""Verify that _tablet_load_stats_refresh is properly joined during
topology coordinator shutdown, even when a schema change notification
triggers a refresh between run() completing and stop() being called.
Reproduces the scenario using two injection points:
- topology_coordinator_pause_before_stop: pauses after run() finishes
but before stop() is called
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
so it's still in-flight during shutdown
Without the join in stop(), the refresh task outlives the coordinator
and accesses freed memory.
"""
servers = await manager.servers_add(3)
await manager.get_ready_cql(servers)
async with new_test_keyspace(manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
coord = await get_topology_coordinator(manager)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
coord_idx = host_ids.index(coord)
coord_server = servers[coord_idx]
log = await manager.server_open_log(coord_server.server_id)
mark = await log.mark()
# Injection B: pause between run() returning and stop() being called.
await manager.api.enable_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
# Stepdown causes the topology coordinator to abort and shut down.
logger.info("Triggering stepdown on coordinator")
await trigger_stepdown(manager, coord_server)
# Wait for injection B to fire. The coordinator has finished run() but
# the schema change listener is still registered.
mark, _ = await log.wait_for(
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
# Enable it now so it only catches the notification-triggered call.
await manager.api.enable_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
# CREATE TABLE fires on_create_column_family on the old coordinator which
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
# via with_scheduling_group on the gossip scheduling group.
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
# Release injection B: coordinator proceeds through stop().
# Without the fix, stop() returns quickly and run_topology_coordinator
# frees the topology_coordinator frame. With the fix, stop() blocks at
# _tablet_load_stats_refresh.join() until injection A is released.
logger.info("Releasing injection B: coordinator will stop")
await manager.api.message_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
# Release injection A: refresh_tablet_load_stats() resumes and accesses
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
# points to freed memory and ASan detects heap-use-after-free.
logger.info("Releasing injection A: refresh resumes")
await manager.api.message_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
# If the bug is present, the node crashed. read_barrier will fail.
await read_barrier(manager.api, coord_server.ip_addr)

View File

@@ -435,8 +435,9 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str): async def get_replication_options(ks: str, host, ip_addr):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'") await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -451,43 +452,44 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
host_ids = [await manager.get_host_id(s.server_id) for s in servers] host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1") repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
await cql.run_async("create table ks2.t (pk int primary key);") await cql.run_async("create table ks2.t (pk int primary key);")
repl = await get_replication_options("ks2") repl = await get_replication_options("ks2", host, servers[0].ip_addr)
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks3.t (pk int primary key);") await cql.run_async("create table ks3.t (pk int primary key);")
repl = await get_replication_options("ks3") repl = await get_replication_options("ks3", host, servers[0].ip_addr)
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks4.t (pk int primary key);") await cql.run_async("create table ks4.t (pk int primary key);")
repl = await get_replication_options("ks4") repl = await get_replication_options("ks4", host, servers[0].ip_addr)
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks5.t (pk int primary key);") await cql.run_async("create table ks5.t (pk int primary key);")
repl = await get_replication_options("ks5") repl = await get_replication_options("ks5", host, servers[0].ip_addr)
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks6.t (pk int primary key);") await cql.run_async("create table ks6.t (pk int primary key);")
repl = await get_replication_options("ks6") repl = await get_replication_options("ks6", host, servers[0].ip_addr)
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers] [await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1") repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == ['rack1b'] assert repl['dc1'] == ['rack1b']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t") tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
@@ -497,7 +499,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
assert r.replicas[0][0] == host_ids[1] assert r.replicas[0][0] == host_ids[1]
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};") await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
repl = await get_replication_options("ks2") repl = await get_replication_options("ks2", host, servers[0].ip_addr)
assert repl['dc1'] == ['rack1a'] assert repl['dc1'] == ['rack1a']
assert len(repl['dc2']) == 2 assert len(repl['dc2']) == 2
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2'] assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
@@ -523,13 +525,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
pass pass
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};") await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
repl = await get_replication_options("ks5") repl = await get_replication_options("ks5", host, servers[0].ip_addr)
assert len(repl['dc1']) == 2 assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};") await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
repl = await get_replication_options("ks6") repl = await get_replication_options("ks6", host, servers[0].ip_addr)
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a' assert repl['dc2'][0] == 'rack2a'
@@ -537,8 +539,9 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str): async def get_replication_options(ks: str, host, ip_addr):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'") await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -551,10 +554,11 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})] await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1") repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}") await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
@@ -574,19 +578,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
servers = servers[0:-1] servers = servers[0:-1]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1") repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == ['rack1b'] assert repl['dc1'] == ['rack1b']
logging.info("Rolling restart") logging.info("Rolling restart")
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"]) await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks2") repl = await get_replication_options("ks2", host, servers[0].ip_addr)
assert len(repl['dc1']) == 2 assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks3") repl = await get_replication_options("ks3", host, servers[0].ip_addr)
assert len(repl['dc1']) == 1 assert len(repl['dc1']) == 1
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
@@ -602,7 +606,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
assert failed assert failed
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
repl = await get_replication_options("ks1") repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert len(repl['dc1']) == 1 assert len(repl['dc1']) == 1
assert repl['dc1'][0] == 'rack1b' assert repl['dc1'][0] == 'rack1b'
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
@@ -1109,8 +1113,9 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str): async def get_replication_options(ks: str, host, ip_addr):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'") await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -1128,10 +1133,11 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
host_ids = [await manager.get_host_id(s.server_id) for s in servers] host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1") repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers] [await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
@@ -1165,7 +1171,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
failed = True failed = True
assert failed assert failed
repl = await get_replication_options("ks1") repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -18,6 +18,7 @@
#include <seastar/testing/test_case.hh> #include <seastar/testing/test_case.hh>
#include "test/lib/exception_utils.hh" #include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh" #include "test/lib/test_utils.hh"
#include "ldap_common.hh" #include "ldap_common.hh"
#include "service/migration_manager.hh" #include "service/migration_manager.hh"
@@ -681,3 +682,41 @@ SEASTAR_TEST_CASE(ldap_config) {
}, },
make_ldap_config()); make_ldap_config());
} }
// Reproduces the race between the cache pruner and the permission
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
auto cfg = make_ldap_config();
cfg->permissions_update_interval_in_ms.set(1);
auto call_count = seastar::make_lw_shared<int>(0);
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
auto& cache = env.auth_cache().local();
testlog.info("Populating 50 cache entries");
for (int i = 0; i < 50; i++) {
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
cache.get_permissions(auth::role_or_anonymous(), r).get();
}
testlog.info("Installing slow permission loader (10ms per call)");
cache.set_permission_loader(
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
-> seastar::future<auth::permission_set> {
++(*call_count);
co_await seastar::sleep(std::chrono::milliseconds(10));
co_return auth::permission_set();
});
testlog.info("Waiting for pruner to start reloading");
while (*call_count == 0) {
seastar::sleep(std::chrono::milliseconds(1)).get();
}
testlog.info("Pruner started, letting teardown run");
}, cfg);
testlog.info("Loader called {} times", *call_count);
}

View File

@@ -126,6 +126,9 @@ class CppFile(pytest.File, ABC):
return args return args
def collect(self) -> Iterator[CppTestCase]: def collect(self) -> Iterator[CppTestCase]:
if BUILD_MODE not in self.stash:
return
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS) custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
for test_case in self.list_test_cases(): for test_case in self.list_test_cases():

View File

@@ -163,6 +163,11 @@ def scylla_binary(testpy_test) -> Path:
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
items[:] = [
item for item in items
if (parent_file := item.getparent(cls=pytest.File)) is not None
and BUILD_MODE in parent_file.stash
]
for item in items: for item in items:
modify_pytest_item(item=item) modify_pytest_item(item=item)
@@ -285,7 +290,10 @@ def pytest_configure(config: pytest.Config) -> None:
pytest_log_dir.mkdir(parents=True, exist_ok=True) pytest_log_dir.mkdir(parents=True, exist_ok=True)
if not _pytest_config.getoption("--save-log-on-success"): if not _pytest_config.getoption("--save-log-on-success"):
for file in pytest_log_dir.glob("*"): for file in pytest_log_dir.glob("*"):
file.unlink() # This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
# condition, especially with repeat.
file.unlink(missing_ok=True)
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log" _pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
@@ -340,7 +348,8 @@ def pytest_collect_file(file_path: pathlib.Path,
repeats = list(product(build_modes, parent.config.run_ids)) repeats = list(product(build_modes, parent.config.run_ids))
if not repeats: if not repeats:
return [] parent.stash[REPEATING_FILES].remove(file_path)
return collectors
ihook = parent.ihook ihook = parent.ihook
collectors = list(chain(collectors, chain.from_iterable( collectors = list(chain(collectors, chain.from_iterable(

View File

@@ -75,6 +75,7 @@ def test_no_bare_skip_markers_in_collection():
"--collect-only", "--collect-only",
"--ignore=boost", "--ignore=raft", "--ignore=boost", "--ignore=raft",
"--ignore=ldap", "--ignore=vector_search", "--ignore=ldap", "--ignore=vector_search",
"--ignore=unit",
"-p", "no:sugar"], "-p", "no:sugar"],
capture_output=True, text=True, capture_output=True, text=True,
cwd=str(_TEST_ROOT), cwd=str(_TEST_ROOT),