diff --git a/test/cluster/test_incremental_repair.py b/test/cluster/test_incremental_repair.py index 446ed8ab2d..a4eeaaab65 100644 --- a/test/cluster/test_incremental_repair.py +++ b/test/cluster/test_incremental_repair.py @@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair from test.pylib.tablets import get_all_tablet_replicas from test.cluster.tasks.task_manager_client import TaskManagerClient -from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown +from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown, create_new_test_keyspace from test.pylib.util import wait_for_cql_and_get_hosts from cassandra.query import ConsistencyLevel, SimpleStatement @@ -885,12 +885,56 @@ class _LeadershipTransferred(Exception): """Raised when leadership transferred to servers[1] during the test, requiring a retry.""" pass +async def _setup_table_for_race_window(manager, servers, cql): + """Create a fresh keyspace+table with incremental repair setup for the race window test. + + Creates a new keyspace (unique name each call), creates the table with + tombstone_gc=repair and STCS min_threshold=2, inserts keys 0-9 as baseline, + runs repair 1 (sstables_repaired_at=1), then inserts keys 10-19 (subject + of repair 2) and flushes all nodes. + + Returns (ks, current_key) where current_key is 20. + """ + ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', " + "'replication_factor': 3} AND tablets = {'initial': 2};") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) " + f"WITH tombstone_gc = {{'mode':'repair'}};") + + # Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the + # UNREPAIRED compaction view, making the race easy to trigger deterministically. + await cql.run_async( + f"ALTER TABLE {ks}.test WITH compaction = " + f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}" + ) + + # Insert keys 0-9 (baseline for repair 1). + keys = list(range(0, 10)) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in keys]) + + # Disable autocompaction everywhere so we control exactly when compaction runs. + for s in servers: + await manager.api.disable_autocompaction(s.ip_addr, ks, 'test') + + # Repair 1: establishes sstables_repaired_at=1 on all nodes. + # Keys 0-9 end up in S0'(repaired_at=1) on all nodes. + await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", 'all', incremental_mode='incremental') + + # Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0). + # These will be the subject of repair 2. + repair2_keys = list(range(10, 20)) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys]) + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, ks) + + return ks, 20 + async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key): """Core logic for test_incremental_repair_race_window_promotes_unrepaired_data. Returns the next current_key value. - Raises _LeadershipTransferred if servers[1] becomes coordinator after the - restart, signalling the caller to retry. + Raises _LeadershipTransferred if the topology coordinator changes or if a + residual re-repair is detected, signalling the caller to retry with a fresh + keyspace. """ # Ensure servers[1] is not the topology coordinator. If the coordinator is # restarted, the Raft leader dies, a new election occurs, and the new @@ -937,7 +981,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to # still 1, so is_repaired(1, S1'{repaired_at=2}) == false and S1' lands in the # UNREPAIRED compaction view on every replica. The race window is now open. pos, _ = await coord_log.wait_for("Finished tablet repair host=", from_mark=coord_mark) - await coord_log.wait_for("Finished tablet repair host=", from_mark=pos) + post_marks_pos, _ = await coord_log.wait_for("Finished tablet repair host=", from_mark=pos) # --- Race window is open --- # Write post-repair keys 20-29. All nodes receive the writes into their memtables @@ -967,15 +1011,28 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to await manager.server_start(target.server_id) await manager.servers_see_each_other(servers) - # Check if leadership transferred to servers[1] during the restart. - # If so, the new coordinator will re-initiate repair, masking the bug. + # Check if leadership transferred during the restart. Any coordinator + # change (not just to servers[1]) can trigger a residual re-repair that + # flushes memtables on all replicas and marks post-repair data as repaired, + # masking the bug this test detects. new_coord = await get_topology_coordinator(manager) - new_coord_serv = await find_server_by_host_id(manager, servers, new_coord) - if new_coord_serv == servers[1]: + if new_coord != coord: await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update") await manager.api.wait_task(servers[0].ip_addr, task_id) raise _LeadershipTransferred( - "servers[1] became topology coordinator after restart") + f"topology coordinator changed from {coord} to {new_coord} after restart") + + # Even without a coordinator change, check if the coordinator initiated a + # residual re-repair (e.g. after seeing tablets stuck in the repair stage + # following the topology restart). Such a re-repair flushes memtables on + # all replicas and contaminates the repaired set with post-repair data. + rerepair_matches = await coord_log.grep("Initiating tablet repair host=", from_mark=post_marks_pos) + if rerepair_matches: + logger.warning(f"Coordinator initiated residual re-repair post-restart: {rerepair_matches[0][1]}") + await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update") + await manager.api.wait_task(servers[0].ip_addr, task_id) + raise _LeadershipTransferred( + "coordinator initiated residual re-repair after restart") # Poll until compaction has produced F(repaired_at=2) containing post-repair keys, # confirming that the bug was triggered (S1' and E merged during the race window). @@ -990,12 +1047,28 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to break if compaction_ran: break + # Check for residual re-repair during the polling window. + rerepair_matches = await coord_log.grep("Initiating tablet repair host=", from_mark=post_marks_pos) + if rerepair_matches: + logger.warning(f"Coordinator initiated residual re-repair during poll: {rerepair_matches[0][1]}") + await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update") + await manager.api.wait_task(servers[0].ip_addr, task_id) + raise _LeadershipTransferred( + "coordinator initiated residual re-repair during compaction poll") await asyncio.sleep(1) # --- Release the race window --- await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update") await manager.api.wait_task(servers[0].ip_addr, task_id) + # Final re-repair check after injection release: the coordinator may have + # queued a re-repair that only executes once the injection is lifted. + rerepair_matches = await coord_log.grep("Initiating tablet repair host=", from_mark=post_marks_pos) + if rerepair_matches: + logger.warning(f"Coordinator initiated residual re-repair after injection release: {rerepair_matches[0][1]}") + raise _LeadershipTransferred( + "coordinator initiated residual re-repair after injection release") + if not compaction_ran: logger.warning("Compaction did not merge S1' and E after restart during the race window; " "the bug was not triggered. Skipping assertion.") @@ -1059,45 +1132,25 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient): cmdline = ['--hinted-handoff-enabled', '0'] - servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \ + servers, cql, hosts, _, _, _, _, _, _, _ = \ await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2) - # Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the - # UNREPAIRED compaction view, making the race easy to trigger deterministically. - await cql.run_async( - f"ALTER TABLE {ks}.test WITH compaction = " - f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}" - ) - - # Disable autocompaction everywhere so we control exactly when compaction runs. - for s in servers: - await manager.api.disable_autocompaction(s.ip_addr, ks, 'test') - scylla_path = await manager.server_get_exe(servers[0].server_id) - # Repair 1: establishes sstables_repaired_at=1 on all nodes. - # Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in - # S0'(repaired_at=1) on all nodes. - await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') + ks, current_key = await _setup_table_for_race_window(manager, servers, cql) - # Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0). - # These will be the subject of repair 2. - repair2_keys = list(range(current_key, current_key + 10)) - await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys]) - for s in servers: - await manager.api.flush_keyspace(s.ip_addr, ks) - current_key += 10 - - # If leadership transfers to servers[1] between our coordinator check and the - # restart, the coordinator change masks the bug. Detect and retry. + # If leadership transfers or a residual re-repair is triggered between our + # coordinator check and the restart, the coordinator change masks the bug. + # Detect and retry with a fresh keyspace. max_attempts = 5 for attempt in range(1, max_attempts + 1): try: current_key = await _do_race_window_promotes_unrepaired_data( - manager, servers, cql, ks, token, scylla_path, current_key) + manager, servers, cql, ks, 'all', scylla_path, current_key) return except _LeadershipTransferred as e: logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.") + ks, current_key = await _setup_table_for_race_window(manager, servers, cql) pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; " "could not run the test without coordinator interference.")