diff --git a/test/cluster/test_cdc_generation_clearing.py b/test/cluster/test_cdc_generation_clearing.py index bb12d17a75..10a6923675 100644 --- a/test/cluster/test_cdc_generation_clearing.py +++ b/test/cluster/test_cdc_generation_clearing.py @@ -3,7 +3,7 @@ # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 # -from test.pylib.rest_client import inject_error +from test.pylib.rest_client import inject_error, read_barrier from test.pylib.manager_client import ManagerClient from test.pylib.util import wait_for, wait_for_cql_and_get_hosts from test.cluster.util import wait_for_cdc_generations_publishing, \ @@ -14,6 +14,7 @@ from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable= from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module +import asyncio import pytest import logging import time @@ -64,6 +65,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient): mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60) logger.info(f"Generations after second clearing attempt: {gen_ids}") assert len(gen_ids) == 2 and first_gen_id in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts) second_gen_id = max(gen_ids) @@ -75,6 +77,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient): mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60) logger.info(f"Generations after third clearing attempt: {gen_ids}") assert len(gen_ids) == 1 and first_gen_id not in gen_ids and second_gen_id not in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts) third_gen_id = max(gen_ids) @@ -85,6 +88,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient): mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60) logger.info(f"Generations after fourth clearing attempt: {gen_ids}") assert len(gen_ids) == 1 and third_gen_id not in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts) @@ -140,6 +144,7 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient) mark = await log_file1.mark() gen_ids = await get_gen_ids() assert len(gen_ids) == 2 and first_gen_id not in gen_ids + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3]) # Allow the CDC generation publisher to finish its job. One generation should remain. @@ -147,4 +152,5 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient) await log_file1.wait_for(f"CDC generation publisher fiber has nothing to do. Sleeping.", from_mark=mark) gen_ids = await get_gen_ids() assert len(gen_ids) == 1 + await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers]) await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])