From 43cab2a33214e21cb14e774d5d3ac5bfc76ce540 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 19 May 2026 15:10:05 +0200 Subject: [PATCH] test_...data_resurrection: Add test case for repair CL truncation Tests that we can remove tombstones even while live commitlog segments hold data for a table, iff repair added to CL truncation log. --- ...est_commitlog_segment_data_resurrection.py | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/test/cluster/test_commitlog_segment_data_resurrection.py b/test/cluster/test_commitlog_segment_data_resurrection.py index 8c988beea7..c0d06ecafc 100644 --- a/test/cluster/test_commitlog_segment_data_resurrection.py +++ b/test/cluster/test_commitlog_segment_data_resurrection.py @@ -11,6 +11,12 @@ import os import logging import glob import json +import asyncio +import time +from functools import partial +from datetime import datetime, timezone +from test.cluster.util import new_test_keyspace +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts logger = logging.getLogger(__name__) @@ -129,3 +135,164 @@ async def test_pinned_cl_segment_doesnt_resurrect_data(manager: ManagerClient): cql = manager.cql assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0 + + +@pytest.mark.asyncio +async def test_pinned_cl_segment_doesnt_resurrect_data_but_repair_ensures_tombstone_gc(manager: ManagerClient): + """ + """ + cfg = { + "commitlog_sync": "batch", + "commitlog_segment_size_in_mb": 1, + "enable_cache": False, + "hinted_handoff_enabled": False, + "repair_hints_batchlog_flush_cache_time_in_ms": 0, + } + servers = await manager.servers_add(3, config=cfg, property_file=[ + {"dc": "dc1", "rack": "r1"}, + {"dc": "dc1", "rack": "r2"}, + {"dc": "dc1", "rack": "r3"}] + ) + + cql = manager.cql + + hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0] + for s in servers] + + async def get_segments_num(server): + metrics_res = await manager.metrics.query(server.ip_addr) + return int(metrics_res.get("scylla_commitlog_segments")) + + async def get_segments_nums(): + return [await get_segments_num(s) for s in servers] + + def less_than_by(after, before, off = 0): + return all(x < (y + off) for x, y in zip(before, after)) + + async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks1, \ + new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks2: + tbl1 = f"{ks1}.tbl1" + tbl2 = f"{ks2}.tbl2" + await cql.run_async(f"create table {tbl1} (pk int, ck int, primary key(pk, ck)) WITH tombstone_gc = {{'mode': 'repair' }}") + await cql.run_async(f"create table {tbl2} (pk int, ck int, v text, primary key(pk, ck)) WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': '0'}}") + + insert_id_tbl1 = cql.prepare(f"INSERT INTO {tbl1} (pk, ck) VALUES (?, ?)") + insert_id_tbl2 = cql.prepare(f"INSERT INTO {tbl2} (pk, ck, v) VALUES (?, ?, ?)") + pk1 = 0 + pk2 = 1 + ck = 0 + value = "v" * 1024 + + segments_before_writes = await get_segments_nums() + segments_after_writes = segments_before_writes + + logger.debug("Have %s segments before writing data", segments_after_writes) + + logger.debug("Filling segment with mixed data from %s and %s", tbl2, tbl2) + + # Ensure at least one segment with writes from both tables + while less_than_by(segments_before_writes, segments_after_writes, 1): + cql.execute(insert_id_tbl1, (pk1, ck)) + cql.execute(insert_id_tbl2, (pk1, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + logger.debug("Filling segment(s) with %s only", tbl2) + + while less_than_by(segments_before_writes, segments_after_writes, 3): + cql.execute(insert_id_tbl2, (pk1, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + cql.execute(f"DELETE FROM {tbl2} WHERE pk = {pk1}") + + # We need to make sure the segment in which the above delete landed in + # is full, otherwise the memtable flush will not be able to destroy it. + logger.debug("Filling another segment with %s (pk=%s)", tbl2, pk2) + + while less_than_by(segments_before_writes, segments_after_writes, 4): + cql.execute(insert_id_tbl2, (pk2, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + logger.debug("Wrote %s rows, now have %s segments", ck, segments_after_writes) + logger.debug("Flush %s", tbl2) + + async def flush_ks(server): + await manager.api.keyspace_flush(node_ip=server.ip_addr, keyspace=ks2, table="tbl2") + + async def compact_ks(server): + await manager.api.keyspace_compaction(node_ip=server.ip_addr, keyspace=ks2, table="tbl2") + + await asyncio.gather(*[flush_ks(s) for s in servers]) + + segments_after = await get_segments_nums() + logger.debug("After flush, now have %s segments", segments_after) + + assert len(list(cql.execute(f"SELECT * FROM {tbl1} WHERE pk = {pk1}"))) > 0 + assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0 + + tombstone_mark = datetime.now(timezone.utc) + + def get_tombstone(row): + if row.metadata is None: + return None + metadata = json.loads(row.metadata) + return metadata.get("tombstone") + + async def list_tombstones(tombstone_mark, host): + res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({tbl2})", host=host)) + tombstones = [] + for row in res: + tombstone = get_tombstone(row) + if tombstone and datetime.fromtimestamp(float(tombstone["timestamp"])/1_000_000, timezone.utc) < tombstone_mark: + tombstones.append(tombstone) + return tombstones + + async def list_all_tombstones(tombstone_mark): + tombstones_per_host = await asyncio.gather(*[list_tombstones(tombstone_mark, host) + for host in hosts]) + all_tombstones = [] + for tombstones in tombstones_per_host: + all_tombstones += tombstones + return all_tombstones + + async def tombstone_gc_completed(tombstone_mark): + # flush and compact the keyspace + await asyncio.gather(*[flush_ks(s) for s in servers]) + await asyncio.gather(*[compact_ks(s) for s in servers]) + + all_tombstones = await list_all_tombstones(tombstone_mark) + logger.debug(all_tombstones) + tombstones_count_total = len(all_tombstones) + if tombstones_count_total != 0: + return None + return True + + # should usually run much faster than 30s, but left some margin to avoid flakiness + async def verify_tombstone_gc(tombstone_mark, timeout=30): + deadline = time.time() + timeout + await wait_for(partial(tombstone_gc_completed, tombstone_mark), deadline) + + + tombstones = await list_all_tombstones(tombstone_mark) + + assert len(tombstones) > 0, "there should be tombstones at this point" + + # wait for 2 sec to let the current tombstones fully expire + #await asyncio.sleep(2) + await manager.api.repair(servers[0].ip_addr, ks2, "tbl2") + + # now we should be able to get to a state where all tombstones are gone. + await verify_tombstone_gc(tombstone_mark) + + logger.debug("Kill + restart the nodes") + + await asyncio.gather(*[manager.server_stop(s.server_id, False) for s in servers]) + await asyncio.gather(*[manager.server_start(s.server_id) for s in servers]) + + manager.driver_close() + await manager.driver_connect() + cql = manager.cql + + assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0