test: fix flaky test_kill_coordinator_during_op

The test hardcoded the expected number of coordinator elections
(2, 3, 4, 5) for each phase. If a prior phase triggered an extra
election, subsequent phases would wait for a count that was already
reached or would never match.

Fix by reading the current election count before each operation and
expecting exactly one more, making each phase independent of prior
history.

Also add wait_for_no_pending_topology_transition() calls after each
coordinator election to ensure the topology state machine has fully
settled before proceeding with restarts and further operations.

Decrease the failure detector timeout (failure_detector_timeout_in_ms)
to 2000 ms on all test nodes so that coordinator crashes are detected
faster, reducing test wallclock time and timeout-related flakiness.

Enable raft_topology=trace logging on all test nodes to aid
post-failure diagnosis. Add diagnostic logging in
wait_new_coordinator_elected().

Fixes: SCYLLADB-1089

Closes scylladb/scylladb#29284
This commit is contained in:
Aleksandr Bykov
2026-03-31 14:43:25 +07:00
committed by Avi Kivity
parent 1febfbd9b5
commit 8afdae24d2
2 changed files with 63 additions and 14 deletions

View File

@@ -11,7 +11,8 @@ import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.scylla_cluster import ReplaceConfig
from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency,
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected)
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected,
wait_for_no_pending_topology_transition)
logger = logging.getLogger(__name__)
@@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout: int, scale_timeout: callable) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -57,9 +58,11 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
logger.debug("Kill coordinator during decommission")
coordinator_host = await get_coordinator_host(manager)
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors")
await wait_new_coordinator_elected(manager, 2, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager)
@@ -73,33 +76,40 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
node_to_remove_srv_id = other_nodes[-1].server_id
logger.debug("Stop node with srv_id %s", node_to_remove_srv_id)
await manager.server_stop_gracefully(node_to_remove_srv_id)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id)
await manager.remove_node(working_srv_id,
node_to_remove_srv_id,
expected_error="Removenode failed. See earlier errors")
await wait_new_coordinator_elected(manager, 3, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id)
await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers())
logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id)
await manager.remove_node(working_srv_id, node_to_remove_srv_id)
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager)
logger.debug("Restore number of nodes in cluster")
await manager.server_add(cmdline=cmdline)
await manager.server_add(config=config, cmdline=cmdline)
# kill coordinator during bootstrap
logger.debug("Kill coordinator during bootstrap")
nodes = await manager.running_servers()
coordinator_host = await get_coordinator_host(manager)
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
new_node = await manager.server_add(start=False, cmdline=cmdline)
new_node = await manager.server_add(start=False, config=config, cmdline=cmdline)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
await manager.server_start(new_node.server_id,
expected_error="Startup failed: std::runtime_error")
await wait_new_coordinator_elected(manager, 4, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager)
@@ -111,11 +121,13 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
node_to_replace_srv_id = other_nodes[-1].server_id
await manager.server_stop_gracefully(node_to_replace_srv_id)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True)
new_node = await manager.server_add(start=False, replace_cfg=replace_cfg, cmdline=cmdline)
new_node = await manager.server_add(start=False, config=config, replace_cfg=replace_cfg, cmdline=cmdline)
await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors")
await wait_new_coordinator_elected(manager, 5, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
logger.debug("Start old coordinator node")
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
await manager.server_restart(coordinator_host.server_id, wait_others=1)
@@ -123,5 +135,5 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it")
coordinator_host = await get_coordinator_host(manager)
await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + scale_timeout(60))
await check_token_ring_and_group0_consistency(manager)

View File

@@ -18,7 +18,7 @@ from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvail
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from test.pylib.internal_types import ServerInfo, HostID
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import get_host_api_address, read_barrier
from test.pylib.rest_client import HTTPError, get_host_api_address, read_barrier
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name
from typing import Optional, List, Union
@@ -119,6 +119,42 @@ async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> Non
assert token_ring_ids == group0_ids
async def wait_for_no_pending_topology_transition(manager: ManagerClient, deadline: float) -> None:
"""Wait until there is no pending topology transition.
Polls system.topology until the transition_state column is null,
indicating that the topology coordinator has finished processing the
current operation (whether it completed successfully or was rolled back).
"""
cql = manager.get_cql()
async def no_transition():
try:
host = await get_available_host(cql, deadline)
await read_barrier(manager.api, get_host_api_address(host))
rs = await cql.run_async(
"select transition_state from system.topology where key = 'topology'",
host=host)
except NoHostAvailable as e:
logger.info(f"Topology transition check failed, retrying: {e}")
return None
except ConnectionException as e:
logger.info(f"Topology transition check failed, retrying: {e}")
return None
except HTTPError as e:
logger.info(f"Read barrier failed, retrying: {e}")
return None
if not rs:
logger.warning(f"Topology transition not visible: system.topology row not found, retrying")
return None
if rs[0].transition_state is not None:
logger.warning(f"Topology transition still in progress: {rs[0].transition_state}")
return None
return True
await wait_for(no_transition, deadline, period=.5)
async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None:
"""
Weaker version of the above check.
@@ -398,13 +434,14 @@ def get_uuid_from_str(string: str) -> str:
async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None:
"""Wait new coordinator to be elected
Wait while the table 'system.group0_history' will have a number of lines
with the 'new topology coordinator' equal to the expected_num_of_elections number,
Wait while the table 'system.group0_history' will have at least
expected_num_of_elections lines with 'new topology coordinator',
and the latest host_id coordinator differs from the previous one.
"""
async def new_coordinator_elected():
coordinators_ids = await get_coordinator_host_ids(manager)
if len(coordinators_ids) == expected_num_of_elections \
logger.debug(f"Coordinators ids in history: {coordinators_ids}")
if len(coordinators_ids) >= expected_num_of_elections \
and coordinators_ids[0] != coordinators_ids[1]:
return True
logger.warning("New coordinator was not elected %s", coordinators_ids)