mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-01 13:45:53 +00:00
Merge 'test/cluster/test_incremental_repair: fix flaky coordinator-change scenario' from Avi Kivity
- Ensure servers[1] is not the topology coordinator before restarting it, preventing the leader death + re-election + re-repair sequence that masked the compaction-merge bug
- Add a retry loop that detects post-restart leadership transfer to servers[1] via direct coordinator query, retrying up to 5 times
Fixes: SCYLLADB-1743
Backporting to 2026.2, which sees the failure regularly.
Closes scylladb/scylladb#29671
* github.com:scylladb/scylladb:
test/cluster/test_incremental_repair: add retry for residual leadership race
test/cluster/test_incremental_repair: fix flaky coordinator-change scenario
(cherry picked from commit 3ea4af1c8c)
Closes scylladb/scylladb#29677
This commit is contained in:
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||
# on the affected replica leading to data resurrection.
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
class _LeadershipTransferred(Exception):
|
||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||
pass
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||
|
||||
Returns the next current_key value.
|
||||
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
||||
restart, signalling the caller to retry.
|
||||
"""
|
||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||
# the compaction-merge bug this test detects.
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
if coord_serv == servers[1]:
|
||||
other = next(s for s in servers if s != servers[1])
|
||||
await ensure_group0_leader_on(manager, other)
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
await manager.server_start(target.server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
# Check if leadership transferred to servers[1] during the restart.
|
||||
# If so, the new coordinator will re-initiate repair, masking the bug.
|
||||
new_coord = await get_topology_coordinator(manager)
|
||||
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
||||
if new_coord_serv == servers[1]:
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"servers[1] became topology coordinator after restart")
|
||||
|
||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||
deadline = time.time() + 60
|
||||
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
if not compaction_ran:
|
||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||
"the bug was not triggered. Skipping assertion.")
|
||||
return
|
||||
return current_key
|
||||
|
||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||
|
||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||
# alive throughout, so no re-repair could have flushed their memtables.
|
||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
return current_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
# If leadership transfers to servers[1] between our coordinator check and the
|
||||
# restart, the coordinator change masks the bug. Detect and retry.
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||
manager, servers, cql, ks, token, scylla_path, current_key)
|
||||
return
|
||||
except _LeadershipTransferred as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||
|
||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||
"could not run the test without coordinator interference.")
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
|
||||
Reference in New Issue
Block a user