mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 20:27:03 +00:00
test_cdc_generation_clearing: wait for generations to propagate
In test_cdc_generation_clearing we trigger events that update CDC generations, verify the generations are updated as expected, and verify the system topology and CDC generations are consistent on all nodes. Before checking that all nodes are consistent and have the same CDC generations, we need to consider that the changes are propagated through raft and take some time to propagate to all nodes. Currently, we wait for the change to be applied only on the first server which runs the CDC generation publisher fiber and read the CDC generations from this single node. The consistency check that follows could fail if the change was not propagated to some other node yet. To fix that, before checking consistency with all nodes, we execute a read barrier on all nodes so they all see the same state as the leader. Fixes scylladb/scylladb#24407 Closes scylladb/scylladb#24433
This commit is contained in:
committed by
Patryk Jędrzejczak
parent
93a7146250
commit
8aeb404893
@@ -3,7 +3,7 @@
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.rest_client import inject_error, read_barrier
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import wait_for_cdc_generations_publishing, \
|
||||
@@ -14,6 +14,7 @@ from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
@@ -64,6 +65,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
|
||||
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
|
||||
logger.info(f"Generations after second clearing attempt: {gen_ids}")
|
||||
assert len(gen_ids) == 2 and first_gen_id in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
second_gen_id = max(gen_ids)
|
||||
|
||||
@@ -75,6 +77,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
|
||||
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
|
||||
logger.info(f"Generations after third clearing attempt: {gen_ids}")
|
||||
assert len(gen_ids) == 1 and first_gen_id not in gen_ids and second_gen_id not in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
third_gen_id = max(gen_ids)
|
||||
|
||||
@@ -85,6 +88,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
|
||||
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
|
||||
logger.info(f"Generations after fourth clearing attempt: {gen_ids}")
|
||||
assert len(gen_ids) == 1 and third_gen_id not in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
|
||||
|
||||
@@ -140,6 +144,7 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient)
|
||||
mark = await log_file1.mark()
|
||||
gen_ids = await get_gen_ids()
|
||||
assert len(gen_ids) == 2 and first_gen_id not in gen_ids
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])
|
||||
|
||||
# Allow the CDC generation publisher to finish its job. One generation should remain.
|
||||
@@ -147,4 +152,5 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient)
|
||||
await log_file1.wait_for(f"CDC generation publisher fiber has nothing to do. Sleeping.", from_mark=mark)
|
||||
gen_ids = await get_gen_ids()
|
||||
assert len(gen_ids) == 1
|
||||
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])
|
||||
|
||||
Reference in New Issue
Block a user