From 8aeb40489371c002df50b6efda62f7c0766409fa Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Sun, 8 Jun 2025 18:07:40 +0300 Subject: [PATCH] test_cdc_generation_clearing: wait for generations to propagate In test_cdc_generation_clearing we trigger events that update CDC generations, verify the generations are updated as expected, and verify the system topology and CDC generations are consistent on all nodes. Before checking that all nodes are consistent and have the same CDC generations, we need to consider that the changes are propagated through raft and take some time to propagate to all nodes. Currently, we wait for the change to be applied only on the first server which runs the CDC generation publisher fiber and read the CDC generations from this single node. The consistency check that follows could fail if the change was not propagated to some other node yet. To fix that, before checking consistency with all nodes, we execute a read barrier on all nodes so they all see the same state as the leader. Fixes scylladb/scylladb#24407 Closes scylladb/scylladb#24433 --- test/cluster/test_cdc_generation_clearing.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/test/cluster/test_cdc_generation_clearing.py b/test/cluster/test_cdc_generation_clearing.py index bb12d17a75..10a6923675 100644 --- a/test/cluster/test_cdc_generation_clearing.py +++ b/test/cluster/test_cdc_generation_clearing.py @@ -3,7 +3,7 @@ # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 # -from test.pylib.rest_client import inject_error +from test.pylib.rest_client import inject_error, read_barrier from test.pylib.manager_client import ManagerClient from test.pylib.util import wait_for, wait_for_cql_and_get_hosts from test.cluster.util import wait_for_cdc_generations_publishing, \ @@ -14,6 +14,7 @@ from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable= from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module +import asyncio import pytest import logging import time @@ -64,6 +65,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient): mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60) logger.info(f"Generations after second clearing attempt: {gen_ids}") assert len(gen_ids) == 2 and first_gen_id in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts) second_gen_id = max(gen_ids) @@ -75,6 +77,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient): mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60) logger.info(f"Generations after third clearing attempt: {gen_ids}") assert len(gen_ids) == 1 and first_gen_id not in gen_ids and second_gen_id not in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts) third_gen_id = max(gen_ids) @@ -85,6 +88,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient): mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60) logger.info(f"Generations after fourth clearing attempt: {gen_ids}") assert len(gen_ids) == 1 and third_gen_id not in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts) @@ -140,6 +144,7 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient) mark = await log_file1.mark() gen_ids = await get_gen_ids() assert len(gen_ids) == 2 and first_gen_id not in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3]) # Allow the CDC generation publisher to finish its job. One generation should remain. @@ -147,4 +152,5 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient) await log_file1.wait_for(f"CDC generation publisher fiber has nothing to do. Sleeping.", from_mark=mark) gen_ids = await get_gen_ids() assert len(gen_ids) == 1 + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])