Compare commits

...

18 Commits

Author SHA1 Message Date
Avi Kivity
c4de2b3c9d Merge 'test: fix flaky tablets test by using read barrier' from Aleksandra Martyniuk
Some tests in test_tablets.py read system_schema.keyspaces from an arbitrary node that may not have applied the latest schema change yet. Pin the read to a specific node and issue a read barrier before querying, ensuring the node has up-to-date data.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1700

Test fix; no backport

Closes scylladb/scylladb#29655

* github.com:scylladb/scylladb:
  test: fix flaky rack list conversion tests by using read barrier
  test: fix flaky test_enforce_rack_list_option by using read barrier
2026-04-28 17:15:59 +03:00
Patryk Jędrzejczak
d9dd3bfe53 Merge 'topology_coordinator: join tablet load stats refresh in stop()' from Andrzej Jackowski
Commit 2b7aa32 (topology_coordinator: Refresh load stats after
table is created or altered) registered topology_coordinator as a
schema change listener and added on_create_column_family which
fire-and-forgets _tablet_load_stats_refresh.trigger(). The
triggered task runs on the gossip scheduling group via
with_scheduling_group and accesses the topology_coordinator via
'this'.

stop() unregisters the listener but does not wait for any
in-flight refresh task. If a notification fires between
_tablet_load_stats_refresh.join() in run() and unregister_listener
in stop(), the scheduled task can outlive the topology_coordinator
and access freed memory after run_topology_coordinator's coroutine
frame is destroyed.

Wait for the refresh to complete in stop() after unregistering the
listener, ensuring no task can fire after destruction.

Fixes SCYLLADB-1728

Backport to 2026.1 and 2026.2, because the issue was introduced in 2b7aa32

Closes scylladb/scylladb#29653

* https://github.com/scylladb/scylladb:
  test: tablet_stats: reproduce shutdown refresh race
  topology_coordinator: join tablet load stats refresh in stop()
2026-04-28 12:54:28 +02:00
Marcin Maliszkiewicz
b0f988afc4 Merge 'auth: fix shutdown and startup races in LDAP cache pruner' from Andrzej Jackowski
The LDAP role manager's `_cache_pruner` background fiber periodically calls cache::reload_all_permissions(). Two races cause it to hit SCYLLA_ASSERT(_permission_loader):
- Cross-shard race: The pruner `used _cache.container().invoke_on_all()` to reload permissions on every shard. Since both `service::start()` and `sharded<service>::stop()` execute per-shard in parallel, the pruner on one shard could call reload_all_permissions() on another shard before that shard set its loader (startup) or after it cleared its loader (shutdown). Each shard runs its own pruner instance, so reloading locally is sufficient — this also removes redundant N² reload calls.
- Intra-shard race: `service::stop()` cleared the permission loader and stopped the role manager concurrently (via when_all_succeed). A mid-reload pruner could yield and then call the now-null loader. Fixed by stopping the role manager first so the pruner is fully drained before the loader is cleared.

Fixes SCYLLADB-1679
Backport to 2026.2, introduced in 7eedf50c12

Closes scylladb/scylladb#29605

* github.com:scylladb/scylladb:
  auth: make shutdown the exact reverse of startup
  test: ldap: add test for pruner crash during shutdown
  auth: start authorizer and set permission loader before role manager
  auth: stop role manager before clearing permission loader
  auth: reload LDAP permission cache on local shard only
2026-04-28 11:16:07 +02:00
Botond Dénes
a7e9c0e6d2 Merge 'test.py: fix test collection bug' from Andrei Chekun
In certain circumstances current way of collecting can be error-prone. Collection can stop when the first file is skipped in the mode leaving the rest of the files in CLI not collected.
Another issue that if the file specified twice, with directory and file explicitly, it will produce incorrect CppFile in the stash causing KeyError.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1714

No backport, test framework bug fix only.

Closes scylladb/scylladb#29634

* github.com:scylladb/scylladb:
  test.py: fix framework test
  test.py: fix test collection bug
2026-04-28 11:52:35 +03:00
Botond Dénes
3ea4af1c8c Merge 'test/cluster/test_incremental_repair: fix flaky coordinator-change scenario' from Avi Kivity
- Ensure servers[1] is not the topology coordinator before restarting it, preventing the leader death + re-election + re-repair sequence that masked the compaction-merge bug
- Add a retry loop that detects post-restart leadership transfer to servers[1] via direct coordinator query, retrying up to 5 times

Fixes: SCYLLADB-1478

Backporting to 2026.2, which sees the failure regularly.

Closes scylladb/scylladb#29671

* github.com:scylladb/scylladb:
  test/cluster/test_incremental_repair: add retry for residual leadership race
  test/cluster/test_incremental_repair: fix flaky coordinator-change scenario
2026-04-28 09:05:02 +03:00
Andrzej Jackowski
459e3970cd test: tablet_stats: reproduce shutdown refresh race
The coordinator can receive a schema-change notification after run()
finishes but before stop() unregisters listeners. The test pins that
window with error injections and verifies stop() waits for the refresh
instead of letting it outlive the coordinator.

Test time in dev: 9.51s

Refs SCYLLADB-1728
2026-04-28 08:00:54 +02:00
Andrzej Jackowski
8756f7c068 topology_coordinator: join tablet load stats refresh in stop()
Commit 2b7aa3211d made schema changes trigger tablet load stats
refreshes in the background. A notification can still arrive after
run() stops the periodic refresher and before the coordinator object
is destroyed.

Move lifecycle subscription cleanup to stop() and join the serialized
refresh there after unregistering refresh trigger sources. This keeps
the coordinator alive until notification-triggered refresh work has
completed.

Fixes SCYLLADB-1728
2026-04-28 07:37:28 +02:00
Avi Kivity
2615d0e8d8 test/cluster/test_incremental_repair: add retry for residual leadership race
There is a small race window where Raft leadership could transfer back
to servers[1] between the ensure_group0_leader_on() check and the
actual restart.  If this happens, the new coordinator re-initiates
repair and masks the compaction-merge bug.

Extract the core test logic into _do_race_window_promotes_unrepaired_data()
which directly checks get_topology_coordinator() after restart and raises
_LeadershipTransferred if servers[1] became coordinator.  The test
function calls this helper in a retry loop (up to 5 attempts).

Refs: SCYLLADB-1478
2026-04-27 21:11:06 +03:00
Avi Kivity
914b70c75b test/cluster/test_incremental_repair: fix flaky coordinator-change scenario
The test_incremental_repair_race_window_promotes_unrepaired_data test
was flaky because it hardcodes servers[1] as the restart target but did
not ensure servers[1] was NOT the topology coordinator.

When servers[1] happened to be the Raft group0 leader (topology
coordinator), restarting it killed the leader, forced a new election,
and the new coordinator re-initiated tablet repair.  This re-repair
flushes memtables on all replicas via take_storage_snapshot() and marks
the resulting sstables as repaired -- causing post-repair keys to appear
in repaired sstables on servers[0] and servers[2].  The test then hit
the wrong assertion (servers[0]/[2] contaminated).

Fix: before starting the repair, check whether servers[1] is the
topology coordinator.  If so, move leadership to another server via
ensure_group0_leader_on() so that restarting servers[1] only kills a
follower -- which does not trigger an election or coordinator change.

Reproducibility was confirmed by forcing leadership to servers[1] via
ensure_group0_leader_on() and observing deterministic failure with all
three servers showing post-repair keys in repaired sstables (confirming
the re-repair scenario), then verifying the fix passes reliably.

Fixes: SCYLLADB-1478
2026-04-27 21:08:12 +03:00
Aleksandra Martyniuk
6b7ce5e244 test: fix flaky rack list conversion tests by using read barrier
test_numeric_rf_to_rack_list_conversion and
test_numeric_rf_to_rack_list_conversion_abort were reading
system_schema.keyspaces from an arbitrary node that may not have
applied the latest schema change yet. Pin the read to a specific node
and issue a read barrier before querying, ensuring the node has
up-to-date data.
2026-04-27 15:19:09 +02:00
Aleksandra Martyniuk
9d3d424d58 test: fix flaky test_enforce_rack_list_option by using read barrier
The test was reading system_schema.keyspaces from an arbitrary node
that may not have applied the latest schema change yet. Pin the read
to a specific node and issue a read barrier before querying, ensuring
the node has up-to-date data.
2026-04-27 14:44:38 +02:00
Andrei Chekun
f2f4915e09 test.py: fix framework test
Framework test was not skipping unit directory where C++ tests are
located. With bug fixing this started to fail. Add ignoring this
directory as well.
2026-04-25 18:04:55 +02:00
Andrei Chekun
92c09d106d test.py: fix test collection bug
In certain circumstances current way of collecting can be error prone.
Collection can stop when the first file is skipped in the mode leaving
the rest of the files in CLI not collected.
Another issue that if the file specified twice, with directory and file
explicitly, it will produce incorrect CppFile in the stash causing
KeyError.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1714
2026-04-24 17:57:11 +02:00
Andrzej Jackowski
8855e77465 auth: make shutdown the exact reverse of startup
The previous parallel stop of the authenticator and authorizer
was a micro-optimization that obscured the lifecycle invariant
that shutdown should reverse startup.

Refs SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
adf1e26bab test: ldap: add test for pruner crash during shutdown
Verify that service::stop() drains the LDAP pruner before
clearing the permission loader. The test installs a slow
permission loader and confirms the pruner is actively
reloading when teardown begins.

Refs SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
37a547604f auth: start authorizer and set permission loader before role manager
LDAP role manager starts a pruner fiber that calls
reload_all_permissions() which asserts _permission_loader is set.
The permission loader calls _authorizer->authorize(), so the
authorizer must be started before the loader is set.

Start authorizer, then set the permission loader, then start the
role manager, ensuring both dependencies are satisfied before the
pruner can fire.

Fixes SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
c3e5285d45 auth: stop role manager before clearing permission loader
service::stop() cleared the permission loader and stopped
the role manager concurrently (via when_all_succeed). The
LDAP pruner could be mid-reload at a yield point when the
loader was set to null, causing it to call a null function.

Stop the role manager first so the pruner is fully drained
before the loader is cleared.

Fixes SCYLLADB-1679
2026-04-24 13:34:09 +02:00
Andrzej Jackowski
f75e5ac65b auth: reload LDAP permission cache on local shard only
The LDAP role manager's _cache_pruner fiber used
invoke_on_all() to reload permissions on every shard.
Since auth::service::start() runs on all shards in
parallel via invoke_on_all(), the pruner on shard X
could call reload_all_permissions() on shard Y before
shard Y finished start() and set its permission loader,
hitting SCYLLA_ASSERT(_permission_loader). The same
cross-shard race existed during shutdown.

Each shard runs its own pruner instance, so reloading
locally is sufficient — all shards are still covered.
This also removes redundant N-squared reload calls.

Refs SCYLLADB-1679
2026-04-24 13:06:58 +02:00
10 changed files with 273 additions and 80 deletions

View File

@@ -258,13 +258,11 @@ future<> ldap_role_manager::start() {
} catch (const seastar::sleep_aborted&) {
co_return; // ignore
}
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
try {
co_await c.reload_all_permissions();
} catch (...) {
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
});
try {
co_await _cache.reload_all_permissions();
} catch (...) {
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
}
});
return _std_mgr.start();

View File

@@ -157,15 +157,12 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
return create_legacy_keyspace_if_missing(mm);
});
}
co_await _role_manager->start();
if (this_shard_id() == 0) {
// Role manager and password authenticator have this odd startup
// mechanism where they asynchronously create the superuser role
// in the background. Correct password creation depends on role
// creation therefore we need to wait here.
co_await _role_manager->ensure_superuser_is_created();
}
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
// Authorizer must be started before the permission loader is set,
// because the loader calls _authorizer->authorize().
// The loader must be set before starting the role manager, because
// LDAP role manager starts a pruner fiber that calls
// reload_all_permissions() which asserts _permission_loader is set.
co_await _authorizer->start();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
@@ -174,12 +171,27 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
co_await _role_manager->start();
if (this_shard_id() == 0) {
// Role manager and password authenticator have this odd startup
// mechanism where they asynchronously create the superuser role
// in the background. Correct password creation depends on role
// creation therefore we need to wait here.
co_await _role_manager->ensure_superuser_is_created();
}
// Authenticator must be started after ensure_superuser_is_created()
// because password_authenticator queries system.roles for the
// superuser entry created by the role manager.
co_await _authenticator->start();
}
future<> service::stop() {
_as.request_abort();
// Reverse of start() order.
co_await _authenticator->stop();
co_await _role_manager->stop();
_cache.set_permission_loader(nullptr);
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
co_await _authorizer->stop();
}
future<> service::ensure_superuser_is_created() {

View File

@@ -4237,6 +4237,7 @@ public:
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator")
{
_lifecycle_notifier.register_subscriber(this);
_db.get_notifier().register_listener(this);
// When the delay_cdc_stream_finalization error injection is disabled
// (test releases it), wake the topology coordinator so it retries
@@ -4400,6 +4401,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
}
future<> topology_coordinator::refresh_tablet_load_stats() {
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
auto tm = get_token_metadata_ptr();
locator::load_stats stats;
@@ -4723,7 +4725,6 @@ future<> topology_coordinator::run() {
co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover);
@@ -4736,6 +4737,8 @@ future<> topology_coordinator::stop() {
co_await _db.get_notifier().unregister_listener(this);
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
_topo_sm.on_tablet_split_ready = nullptr;
co_await _lifecycle_notifier.unregister_subscriber(this);
co_await _tablet_load_stats_refresh.join();
// if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle
@@ -4797,7 +4800,6 @@ future<> run_topology_coordinator(
topology_cmd_rpc_tracker};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);
try {
rtlogger.info("start topology coordinator fiber");
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
@@ -4818,7 +4820,7 @@ future<> run_topology_coordinator(
}
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
}
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
co_await coordinator.stop();
}

View File

@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
from test.pylib.util import wait_for_cql_and_get_hosts
from cassandra.query import ConsistencyLevel, SimpleStatement
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
# affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
class _LeadershipTransferred(Exception):
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
pass
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
Returns the next current_key value.
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
restart, signalling the caller to retry.
"""
# Ensure servers[1] is not the topology coordinator. If the coordinator is
# restarted, the Raft leader dies, a new election occurs, and the new
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
# and marking post-repair data as repaired. That legitimate re-repair masks
# the compaction-merge bug this test detects.
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
if coord_serv == servers[1]:
other = next(s for s in servers if s != servers[1])
await ensure_group0_leader_on(manager, other)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark()
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers)
# Check if leadership transferred to servers[1] during the restart.
# If so, the new coordinator will re-initiate repair, masking the bug.
new_coord = await get_topology_coordinator(manager)
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
if new_coord_serv == servers[1]:
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
raise _LeadershipTransferred(
"servers[1] became topology coordinator after restart")
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.")
return
return current_key
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
# servers[0] and servers[2] were never restarted and the coordinator stayed
# alive throughout, so no re-repair could have flushed their memtables.
# Post-repair keys must NOT appear in repaired sstables on these servers.
assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}"
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
return current_key
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
# If leadership transfers to servers[1] between our coordinator check and the
# restart, the coordinator change masks the bug. Detect and retry.
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
current_key = await _do_race_window_promotes_unrepaired_data(
manager, servers, cql, ks, token, scylla_path, current_key)
return
except _LeadershipTransferred as e:
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
"could not run the test without coordinator interference.")
# ----------------------------------------------------------------------------
# Tombstone GC safety tests

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, trigger_stepdown
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
import pytest
import logging
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
coord3 = await get_topology_coordinator(manager)
if coord3:
break
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
"""Verify that _tablet_load_stats_refresh is properly joined during
topology coordinator shutdown, even when a schema change notification
triggers a refresh between run() completing and stop() being called.
Reproduces the scenario using two injection points:
- topology_coordinator_pause_before_stop: pauses after run() finishes
but before stop() is called
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
so it's still in-flight during shutdown
Without the join in stop(), the refresh task outlives the coordinator
and accesses freed memory.
"""
servers = await manager.servers_add(3)
await manager.get_ready_cql(servers)
async with new_test_keyspace(manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
coord = await get_topology_coordinator(manager)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
coord_idx = host_ids.index(coord)
coord_server = servers[coord_idx]
log = await manager.server_open_log(coord_server.server_id)
mark = await log.mark()
# Injection B: pause between run() returning and stop() being called.
await manager.api.enable_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
# Stepdown causes the topology coordinator to abort and shut down.
logger.info("Triggering stepdown on coordinator")
await trigger_stepdown(manager, coord_server)
# Wait for injection B to fire. The coordinator has finished run() but
# the schema change listener is still registered.
mark, _ = await log.wait_for(
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
# Enable it now so it only catches the notification-triggered call.
await manager.api.enable_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
# CREATE TABLE fires on_create_column_family on the old coordinator which
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
# via with_scheduling_group on the gossip scheduling group.
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
# Release injection B: coordinator proceeds through stop().
# Without the fix, stop() returns quickly and run_topology_coordinator
# frees the topology_coordinator frame. With the fix, stop() blocks at
# _tablet_load_stats_refresh.join() until injection A is released.
logger.info("Releasing injection B: coordinator will stop")
await manager.api.message_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
# Release injection A: refresh_tablet_load_stats() resumes and accesses
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
# points to freed memory and ASan detects heap-use-after-free.
logger.info("Releasing injection A: refresh resumes")
await manager.api.message_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
# If the bug is present, the node crashed. read_barrier will fail.
await read_barrier(manager.api, coord_server.ip_addr)

View File

@@ -435,8 +435,9 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -451,43 +452,44 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
await cql.run_async("create table ks2.t (pk int primary key);")
repl = await get_replication_options("ks2")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
assert repl['dc1'] == '1'
assert repl['dc2'] == '2'
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks3.t (pk int primary key);")
repl = await get_replication_options("ks3")
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks4.t (pk int primary key);")
repl = await get_replication_options("ks4")
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
assert repl['dc1'] == '1'
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks5.t (pk int primary key);")
repl = await get_replication_options("ks5")
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
assert repl['dc1'] == '2'
assert repl['dc2'] == '2'
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks6.t (pk int primary key);")
repl = await get_replication_options("ks6")
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
assert repl['dc1'] == '2'
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == ['rack1b']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
@@ -497,7 +499,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
assert r.replicas[0][0] == host_ids[1]
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
repl = await get_replication_options("ks2")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
assert repl['dc1'] == ['rack1a']
assert len(repl['dc2']) == 2
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
@@ -523,13 +525,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
pass
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
repl = await get_replication_options("ks5")
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
assert repl['dc2'] == '2'
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
repl = await get_replication_options("ks6")
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
assert repl['dc1'] == '2'
assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a'
@@ -537,8 +539,9 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -551,10 +554,11 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1'
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
@@ -574,19 +578,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
servers = servers[0:-1]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == ['rack1b']
logging.info("Rolling restart")
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks2")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks3")
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
assert len(repl['dc1']) == 1
assert len(repl['dc2']) == 1
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
@@ -602,7 +606,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
assert failed
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
repl = await get_replication_options("ks1")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert len(repl['dc1']) == 1
assert repl['dc1'][0] == 'rack1b'
assert len(repl['dc2']) == 1
@@ -1109,8 +1113,9 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -1128,10 +1133,11 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1'
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
@@ -1165,7 +1171,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
failed = True
assert failed
repl = await get_replication_options("ks1")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
assert repl['dc1'] == '1'
@pytest.mark.asyncio

View File

@@ -18,6 +18,7 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "ldap_common.hh"
#include "service/migration_manager.hh"
@@ -681,3 +682,41 @@ SEASTAR_TEST_CASE(ldap_config) {
},
make_ldap_config());
}
// Reproduces the race between the cache pruner and the permission
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
auto cfg = make_ldap_config();
cfg->permissions_update_interval_in_ms.set(1);
auto call_count = seastar::make_lw_shared<int>(0);
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
auto& cache = env.auth_cache().local();
testlog.info("Populating 50 cache entries");
for (int i = 0; i < 50; i++) {
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
cache.get_permissions(auth::role_or_anonymous(), r).get();
}
testlog.info("Installing slow permission loader (10ms per call)");
cache.set_permission_loader(
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
-> seastar::future<auth::permission_set> {
++(*call_count);
co_await seastar::sleep(std::chrono::milliseconds(10));
co_return auth::permission_set();
});
testlog.info("Waiting for pruner to start reloading");
while (*call_count == 0) {
seastar::sleep(std::chrono::milliseconds(1)).get();
}
testlog.info("Pruner started, letting teardown run");
}, cfg);
testlog.info("Loader called {} times", *call_count);
}

View File

@@ -126,6 +126,9 @@ class CppFile(pytest.File, ABC):
return args
def collect(self) -> Iterator[CppTestCase]:
if BUILD_MODE not in self.stash:
return
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
for test_case in self.list_test_cases():

View File

@@ -163,6 +163,11 @@ def scylla_binary(testpy_test) -> Path:
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
items[:] = [
item for item in items
if (parent_file := item.getparent(cls=pytest.File)) is not None
and BUILD_MODE in parent_file.stash
]
for item in items:
modify_pytest_item(item=item)
@@ -285,7 +290,10 @@ def pytest_configure(config: pytest.Config) -> None:
pytest_log_dir.mkdir(parents=True, exist_ok=True)
if not _pytest_config.getoption("--save-log-on-success"):
for file in pytest_log_dir.glob("*"):
file.unlink()
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
# condition, especially with repeat.
file.unlink(missing_ok=True)
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
@@ -340,7 +348,8 @@ def pytest_collect_file(file_path: pathlib.Path,
repeats = list(product(build_modes, parent.config.run_ids))
if not repeats:
return []
parent.stash[REPEATING_FILES].remove(file_path)
return collectors
ihook = parent.ihook
collectors = list(chain(collectors, chain.from_iterable(

View File

@@ -75,6 +75,7 @@ def test_no_bare_skip_markers_in_collection():
"--collect-only",
"--ignore=boost", "--ignore=raft",
"--ignore=ldap", "--ignore=vector_search",
"--ignore=unit",
"-p", "no:sugar"],
capture_output=True, text=True,
cwd=str(_TEST_ROOT),