From 8afdae24d29dd39d644eb4e5af6847e22204cf28 Mon Sep 17 00:00:00 2001 From: Aleksandr Bykov Date: Tue, 31 Mar 2026 14:43:25 +0700 Subject: [PATCH] test: fix flaky test_kill_coordinator_during_op The test hardcoded the expected number of coordinator elections (2, 3, 4, 5) for each phase. If a prior phase triggered an extra election, subsequent phases would wait for a count that was already reached or would never match. Fix by reading the current election count before each operation and expecting exactly one more, making each phase independent of prior history. Also add wait_for_no_pending_topology_transition() calls after each coordinator election to ensure the topology state machine has fully settled before proceeding with restarts and further operations. Decrease the failure detector timeout (failure_detector_timeout_in_ms) to 2000 ms on all test nodes so that coordinator crashes are detected faster, reducing test wallclock time and timeout-related flakiness. Enable raft_topology=trace logging on all test nodes to aid post-failure diagnosis. Add diagnostic logging in wait_new_coordinator_elected(). Fixes: SCYLLADB-1089 Closes scylladb/scylladb#29284 --- ...test_crash_coordinator_before_streaming.py | 32 ++++++++----- test/cluster/util.py | 45 +++++++++++++++++-- 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/test/cluster/test_crash_coordinator_before_streaming.py b/test/cluster/test_crash_coordinator_before_streaming.py index b22b8481ba..d6c04b1153 100644 --- a/test/cluster/test_crash_coordinator_before_streaming.py +++ b/test/cluster/test_crash_coordinator_before_streaming.py @@ -11,7 +11,8 @@ import pytest from test.pylib.manager_client import ManagerClient from test.pylib.scylla_cluster import ReplaceConfig from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency, - get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected) + get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected, + wait_for_no_pending_topology_transition) logger = logging.getLogger(__name__) @@ -19,7 +20,7 @@ logger = logging.getLogger(__name__) @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') -async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None: +async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout: int, scale_timeout: callable) -> None: """ Kill coordinator with error injection while topology operation is running for cluster: decommission, bootstrap, removenode, replace. @@ -57,9 +58,11 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect logger.debug("Kill coordinator during decommission") coordinator_host = await get_coordinator_host(manager) other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors") - await wait_new_coordinator_elected(manager, 2, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.servers_see_each_other(await manager.running_servers()) await check_token_ring_and_group0_consistency(manager) @@ -73,33 +76,40 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect node_to_remove_srv_id = other_nodes[-1].server_id logger.debug("Stop node with srv_id %s", node_to_remove_srv_id) await manager.server_stop_gracefully(node_to_remove_srv_id) + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id) await manager.remove_node(working_srv_id, node_to_remove_srv_id, expected_error="Removenode failed. See earlier errors") - await wait_new_coordinator_elected(manager, 3, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) + await manager.others_not_see_server(server_ip=coordinator_host.ip_addr) logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id) await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.servers_see_each_other(await manager.running_servers()) logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id) await manager.remove_node(working_srv_id, node_to_remove_srv_id) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) + await manager.servers_see_each_other(await manager.running_servers()) await check_token_ring_and_group0_consistency(manager) logger.debug("Restore number of nodes in cluster") - await manager.server_add(cmdline=cmdline) + await manager.server_add(config=config, cmdline=cmdline) # kill coordinator during bootstrap logger.debug("Kill coordinator during bootstrap") nodes = await manager.running_servers() coordinator_host = await get_coordinator_host(manager) other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] - new_node = await manager.server_add(start=False, cmdline=cmdline) + new_node = await manager.server_add(start=False, config=config, cmdline=cmdline) + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.server_start(new_node.server_id, expected_error="Startup failed: std::runtime_error") - await wait_new_coordinator_elected(manager, 4, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.servers_see_each_other(await manager.running_servers()) await check_token_ring_and_group0_consistency(manager) @@ -111,11 +121,13 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] node_to_replace_srv_id = other_nodes[-1].server_id await manager.server_stop_gracefully(node_to_replace_srv_id) + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True) - new_node = await manager.server_add(start=False, replace_cfg=replace_cfg, cmdline=cmdline) + new_node = await manager.server_add(start=False, config=config, replace_cfg=replace_cfg, cmdline=cmdline) await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors") - await wait_new_coordinator_elected(manager, 5, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) logger.debug("Start old coordinator node") await manager.others_not_see_server(server_ip=coordinator_host.ip_addr) await manager.server_restart(coordinator_host.server_id, wait_others=1) @@ -123,5 +135,5 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it") coordinator_host = await get_coordinator_host(manager) await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id) - await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + scale_timeout(60)) await check_token_ring_and_group0_consistency(manager) diff --git a/test/cluster/util.py b/test/cluster/util.py index 008c879041..6b7eef4a50 100644 --- a/test/cluster/util.py +++ b/test/cluster/util.py @@ -18,7 +18,7 @@ from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvail from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module from test.pylib.internal_types import ServerInfo, HostID from test.pylib.manager_client import ManagerClient -from test.pylib.rest_client import get_host_api_address, read_barrier +from test.pylib.rest_client import HTTPError, get_host_api_address, read_barrier from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name from typing import Optional, List, Union @@ -119,6 +119,42 @@ async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> Non assert token_ring_ids == group0_ids +async def wait_for_no_pending_topology_transition(manager: ManagerClient, deadline: float) -> None: + """Wait until there is no pending topology transition. + Polls system.topology until the transition_state column is null, + indicating that the topology coordinator has finished processing the + current operation (whether it completed successfully or was rolled back). + """ + cql = manager.get_cql() + + async def no_transition(): + try: + host = await get_available_host(cql, deadline) + await read_barrier(manager.api, get_host_api_address(host)) + rs = await cql.run_async( + "select transition_state from system.topology where key = 'topology'", + host=host) + except NoHostAvailable as e: + logger.info(f"Topology transition check failed, retrying: {e}") + return None + except ConnectionException as e: + logger.info(f"Topology transition check failed, retrying: {e}") + return None + except HTTPError as e: + logger.info(f"Read barrier failed, retrying: {e}") + return None + + if not rs: + logger.warning(f"Topology transition not visible: system.topology row not found, retrying") + return None + if rs[0].transition_state is not None: + logger.warning(f"Topology transition still in progress: {rs[0].transition_state}") + return None + return True + + await wait_for(no_transition, deadline, period=.5) + + async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None: """ Weaker version of the above check. @@ -398,13 +434,14 @@ def get_uuid_from_str(string: str) -> str: async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None: """Wait new coordinator to be elected - Wait while the table 'system.group0_history' will have a number of lines - with the 'new topology coordinator' equal to the expected_num_of_elections number, + Wait while the table 'system.group0_history' will have at least + expected_num_of_elections lines with 'new topology coordinator', and the latest host_id coordinator differs from the previous one. """ async def new_coordinator_elected(): coordinators_ids = await get_coordinator_host_ids(manager) - if len(coordinators_ids) == expected_num_of_elections \ + logger.debug(f"Coordinators ids in history: {coordinators_ids}") + if len(coordinators_ids) >= expected_num_of_elections \ and coordinators_ids[0] != coordinators_ids[1]: return True logger.warning("New coordinator was not elected %s", coordinators_ids)