From 9622291e071c36a6bfbdbf6c416c09a75dc5d76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 28 Apr 2026 09:05:02 +0300 Subject: [PATCH] Merge 'test/cluster/test_incremental_repair: fix flaky coordinator-change scenario' from Avi Kivity - Ensure servers[1] is not the topology coordinator before restarting it, preventing the leader death + re-election + re-repair sequence that masked the compaction-merge bug - Add a retry loop that detects post-restart leadership transfer to servers[1] via direct coordinator query, retrying up to 5 times Fixes: SCYLLADB-1743 Backporting to 2026.2, which sees the failure regularly. Closes scylladb/scylladb#29671 * github.com:scylladb/scylladb: test/cluster/test_incremental_repair: add retry for residual leadership race test/cluster/test_incremental_repair: fix flaky coordinator-change scenario (cherry picked from commit 3ea4af1c8c7a4291be4fbf8da9a01f1e532a8102) Closes scylladb/scylladb#29677 --- test/cluster/test_incremental_repair.py | 116 +++++++++++++++++------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/test/cluster/test_incremental_repair.py b/test/cluster/test_incremental_repair.py index 7bd277accc..446ed8ab2d 100644 --- a/test/cluster/test_incremental_repair.py +++ b/test/cluster/test_incremental_repair.py @@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair from test.pylib.tablets import get_all_tablet_replicas from test.cluster.tasks.task_manager_client import TaskManagerClient -from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown +from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown from test.pylib.util import wait_for_cql_and_get_hosts from cassandra.query import ConsistencyLevel, SimpleStatement @@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage # affected replica but process the UNREPAIRED sstable on the others, so the classification # divergence is never corrected. In tombstone scenarios this enables premature tombstone GC # on the affected replica leading to data resurrection. -@pytest.mark.asyncio -@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') -async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient): - cmdline = ['--hinted-handoff-enabled', '0'] - servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \ - await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2) - # Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the - # UNREPAIRED compaction view, making the race easy to trigger deterministically. - await cql.run_async( - f"ALTER TABLE {ks}.test WITH compaction = " - f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}" - ) +class _LeadershipTransferred(Exception): + """Raised when leadership transferred to servers[1] during the test, requiring a retry.""" + pass - # Disable autocompaction everywhere so we control exactly when compaction runs. - for s in servers: - await manager.api.disable_autocompaction(s.ip_addr, ks, 'test') - - scylla_path = await manager.server_get_exe(servers[0].server_id) - - # Repair 1: establishes sstables_repaired_at=1 on all nodes. - # Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in - # S0'(repaired_at=1) on all nodes. - await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') - - # Insert keys 10-19 and flush on all nodes → S1(repaired_at=0). - # These will be the subject of repair 2. - repair2_keys = list(range(current_key, current_key + 10)) - await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys]) - for s in servers: - await manager.api.flush_keyspace(s.ip_addr, ks) - current_key += 10 +async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key): + """Core logic for test_incremental_repair_race_window_promotes_unrepaired_data. + Returns the next current_key value. + Raises _LeadershipTransferred if servers[1] becomes coordinator after the + restart, signalling the caller to retry. + """ + # Ensure servers[1] is not the topology coordinator. If the coordinator is + # restarted, the Raft leader dies, a new election occurs, and the new + # coordinator re-initiates tablet repair -- flushing memtables on all replicas + # and marking post-repair data as repaired. That legitimate re-repair masks + # the compaction-merge bug this test detects. coord = await get_topology_coordinator(manager) coord_serv = await find_server_by_host_id(manager, servers, coord) + if coord_serv == servers[1]: + other = next(s for s in servers if s != servers[1]) + await ensure_group0_leader_on(manager, other) + coord = await get_topology_coordinator(manager) + coord_serv = await find_server_by_host_id(manager, servers, coord) coord_log = await manager.server_open_log(coord_serv.server_id) coord_mark = await coord_log.mark() @@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: await manager.server_start(target.server_id) await manager.servers_see_each_other(servers) + # Check if leadership transferred to servers[1] during the restart. + # If so, the new coordinator will re-initiate repair, masking the bug. + new_coord = await get_topology_coordinator(manager) + new_coord_serv = await find_server_by_host_id(manager, servers, new_coord) + if new_coord_serv == servers[1]: + await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update") + await manager.api.wait_task(servers[0].ip_addr, task_id) + raise _LeadershipTransferred( + "servers[1] became topology coordinator after restart") + # Poll until compaction has produced F(repaired_at=2) containing post-repair keys, # confirming that the bug was triggered (S1' and E merged during the race window). deadline = time.time() + 60 @@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: if not compaction_ran: logger.warning("Compaction did not merge S1' and E after restart during the race window; " "the bug was not triggered. Skipping assertion.") - return + return current_key # Flush servers[0] and servers[2] AFTER the race window closes so their post-repair # keys land in G(repaired_at=0): correctly classified as UNREPAIRED. @@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, " f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}") - # servers[0] and servers[2] flushed post-repair keys after the race window closed, - # so those keys are in G(repaired_at=0) → correctly UNREPAIRED. + # servers[0] and servers[2] were never restarted and the coordinator stayed + # alive throughout, so no re-repair could have flushed their memtables. + # Post-repair keys must NOT appear in repaired sstables on these servers. assert not (repaired_keys_0 & post_repair_key_set), \ f"servers[0] should not have post-repair keys in repaired sstables, " \ f"got: {repaired_keys_0 & post_repair_key_set}" @@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: f"on servers[1] after restart lost the being_repaired markers during the race window. " \ f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \ f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}" + return current_key + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient): + cmdline = ['--hinted-handoff-enabled', '0'] + servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \ + await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2) + + # Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the + # UNREPAIRED compaction view, making the race easy to trigger deterministically. + await cql.run_async( + f"ALTER TABLE {ks}.test WITH compaction = " + f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}" + ) + + # Disable autocompaction everywhere so we control exactly when compaction runs. + for s in servers: + await manager.api.disable_autocompaction(s.ip_addr, ks, 'test') + + scylla_path = await manager.server_get_exe(servers[0].server_id) + + # Repair 1: establishes sstables_repaired_at=1 on all nodes. + # Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in + # S0'(repaired_at=1) on all nodes. + await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') + + # Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0). + # These will be the subject of repair 2. + repair2_keys = list(range(current_key, current_key + 10)) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys]) + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, ks) + current_key += 10 + + # If leadership transfers to servers[1] between our coordinator check and the + # restart, the coordinator change masks the bug. Detect and retry. + max_attempts = 5 + for attempt in range(1, max_attempts + 1): + try: + current_key = await _do_race_window_promotes_unrepaired_data( + manager, servers, cql, ks, token, scylla_path, current_key) + return + except _LeadershipTransferred as e: + logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.") + + pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; " + "could not run the test without coordinator interference.") # ---------------------------------------------------------------------------- # Tombstone GC safety tests