test/cluster/test_incremental_repair: fix flaky do_tablet_incremental_repair_and_ops

The log grep in get_sst_status searched from the beginning of the log
(no from_mark), so the second-repair assertions were checking cumulative
counts across both repairs rather than counts for the second repair alone.

The expected values (sst_add==2, sst_mark==2) relied on this cumulative
behaviour: 1 from the first repair + 1 from the second = 2. This works
when the second repair encounters exactly one unrepaired sstable, but
fails whenever the second repair sees two.

The second repair can see two unrepaired sstables when the 100 keys
inserted before it (via asyncio.gather) trigger a background auto-flush
before take_storage_snapshot runs. take_storage_snapshot always flushes
the memtable itself, so if an auto-flush already split the batch into two
sstables on disk, the second repair's snapshot contains both and logs
"Added sst" twice, making the cumulative count 3 instead of 2.

Fix: take a log mark per-server before each repair call and pass it to
get_sst_status so each check counts only the entries produced by that
repair. The expected values become 1/0/1 and 1/1/1 respectively,
independent of how many sstables happened to exist beforehand.

get_sst_status gains an optional from_mark parameter (default None)
which preserves existing call sites that intentionally grep from the
start of the log.

Fixes: SCYLLADB-1086

Closes scylladb/scylladb#29484
This commit is contained in:
Botond Dénes
2026-04-15 13:49:33 +03:00
committed by Patryk Jędrzejczak
parent 7634d3f7d4
commit d280517e27

View File

@@ -44,10 +44,10 @@ def get_sstables(workdir, ks, table):
sstables = glob.glob(base_pattern)
return sstables
async def get_sst_status(run, log):
sst_add = await log.grep(rf'.*Added sst.*for incremental repair')
sst_skip = await log.grep(rf'.*Skipped adding sst.*for incremental repair')
sst_mark = await log.grep(rf'.*Marking.*for incremental repair')
async def get_sst_status(run, log, from_mark=None):
sst_add = await log.grep(rf'.*Added sst.*for incremental repair', from_mark=from_mark)
sst_skip = await log.grep(rf'.*Skipped adding sst.*for incremental repair', from_mark=from_mark)
sst_mark = await log.grep(rf'.*Marking.*for incremental repair', from_mark=from_mark)
logging.info(f'{run=}: {sst_add=} {sst_skip=} {sst_mark=}');
logging.info(f'{run=}: {len(sst_add)=} {len(sst_skip)=} {len(sst_mark)=}');
return sst_add, sst_skip, sst_mark
@@ -292,10 +292,11 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
token = -1
marks = [await log.mark() for log in logs]
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 0 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
for log, mark in zip(logs, marks):
sst_add, sst_skip, sst_mark = await get_sst_status("First", log, from_mark=mark)
assert len(sst_add) == 1
assert len(sst_skip) == 0
assert len(sst_mark) == 1
@@ -318,13 +319,14 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
else:
assert False # Wrong ops
marks = [await log.mark() for log in logs]
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 1 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("Second", log)
assert len(sst_add) == 2
assert len(sst_mark) == 2
for log, mark in zip(logs, marks):
sst_add, sst_skip, sst_mark = await get_sst_status("Second", log, from_mark=mark)
assert len(sst_add) == 1
assert len(sst_mark) == 1
assert len(sst_skip) == 1
@pytest.mark.asyncio