From cdf775d3cc71ea908ed05f399187aed2becde6b4 Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Wed, 2 Oct 2024 16:37:57 +0200 Subject: [PATCH] test: test tombstone GC disabled on pending replica This tests if tombstone GC is disabled on pending replicas --- .../test_tablets.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 3b56499e4a..bc0ce5ce2f 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -1297,6 +1297,73 @@ async def test_tablet_storage_freeing(manager: ManagerClient): size_after = await manager.server_get_sstables_disk_usage(servers[0].server_id, "test", "test") assert size_before * 0.33 < size_after < size_before * 0.66 +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tombstone_gc_disabled_on_pending_replica(manager: ManagerClient): + logger.info("Bootstrapping cluster") + servers = [await manager.server_add()] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH gc_grace_seconds = 0;") + + servers.append(await manager.server_add()) + + key = 7 # Whatever + tablet_token = 0 # Doesn't matter since there is one tablet + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 1) USING timestamp 9") + rows = await cql.run_async("SELECT pk from test.test") + assert len(rows) == 1 + + replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token) + + s0_host_id = await manager.get_host_id(servers[0].server_id) + s1_host_id = await manager.get_host_id(servers[1].server_id) + dst_shard = 0 + + await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True) + s1_log = await manager.server_open_log(servers[1].server_id) + s1_mark = await s1_log.mark() + + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token)) + + await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark) + s1_mark = await s1_log.mark() + + # write a tombstone with timestamp X to DB + await cql.run_async(f'DELETE FROM test.test USING timestamp 10 WHERE pk = {key}') + + # flush both servers + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, "test") + + await asyncio.sleep(1) + + # major compact both servers + for s in servers: + await manager.api.keyspace_compaction(s.ip_addr, "test") + + # write backdated data to test.test with timestamp X-1 with the same key as the tombstone + await cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({key}, 0) USING timestamp 9') + + # release streaming + await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments") + await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark) + + logger.info("Waiting for migration to finish") + await migration_task + logger.info("Migration done") + + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, "test") + + # verify result + rows = await cql.run_async(f'SELECT pk, c FROM test.test WHERE pk = {key};') + assert len(rows) == 0 + @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_schema_change_during_cleanup(manager: ManagerClient):