diff --git a/test/cluster/test_crash_coordinator_before_streaming.py b/test/cluster/test_crash_coordinator_before_streaming.py index b22b8481ba..d6c04b1153 100644 --- a/test/cluster/test_crash_coordinator_before_streaming.py +++ b/test/cluster/test_crash_coordinator_before_streaming.py @@ -11,7 +11,8 @@ import pytest from test.pylib.manager_client import ManagerClient from test.pylib.scylla_cluster import ReplaceConfig from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency, - get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected) + get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected, + wait_for_no_pending_topology_transition) logger = logging.getLogger(__name__) @@ -19,7 +20,7 @@ logger = logging.getLogger(__name__) @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') -async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None: +async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout: int, scale_timeout: callable) -> None: """ Kill coordinator with error injection while topology operation is running for cluster: decommission, bootstrap, removenode, replace. @@ -57,9 +58,11 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect logger.debug("Kill coordinator during decommission") coordinator_host = await get_coordinator_host(manager) other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors") - await wait_new_coordinator_elected(manager, 2, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.servers_see_each_other(await manager.running_servers()) await check_token_ring_and_group0_consistency(manager) @@ -73,33 +76,40 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect node_to_remove_srv_id = other_nodes[-1].server_id logger.debug("Stop node with srv_id %s", node_to_remove_srv_id) await manager.server_stop_gracefully(node_to_remove_srv_id) + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id) await manager.remove_node(working_srv_id, node_to_remove_srv_id, expected_error="Removenode failed. See earlier errors") - await wait_new_coordinator_elected(manager, 3, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) + await manager.others_not_see_server(server_ip=coordinator_host.ip_addr) logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id) await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.servers_see_each_other(await manager.running_servers()) logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id) await manager.remove_node(working_srv_id, node_to_remove_srv_id) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) + await manager.servers_see_each_other(await manager.running_servers()) await check_token_ring_and_group0_consistency(manager) logger.debug("Restore number of nodes in cluster") - await manager.server_add(cmdline=cmdline) + await manager.server_add(config=config, cmdline=cmdline) # kill coordinator during bootstrap logger.debug("Kill coordinator during bootstrap") nodes = await manager.running_servers() coordinator_host = await get_coordinator_host(manager) other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] - new_node = await manager.server_add(start=False, cmdline=cmdline) + new_node = await manager.server_add(start=False, config=config, cmdline=cmdline) + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.server_start(new_node.server_id, expected_error="Startup failed: std::runtime_error") - await wait_new_coordinator_elected(manager, 4, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.servers_see_each_other(await manager.running_servers()) await check_token_ring_and_group0_consistency(manager) @@ -111,11 +121,13 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] node_to_replace_srv_id = other_nodes[-1].server_id await manager.server_stop_gracefully(node_to_replace_srv_id) + num_elections = len(await get_coordinator_host_ids(manager)) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True) - new_node = await manager.server_add(start=False, replace_cfg=replace_cfg, cmdline=cmdline) + new_node = await manager.server_add(start=False, config=config, replace_cfg=replace_cfg, cmdline=cmdline) await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors") - await wait_new_coordinator_elected(manager, 5, time.time() + 60) + await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) + await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60)) logger.debug("Start old coordinator node") await manager.others_not_see_server(server_ip=coordinator_host.ip_addr) await manager.server_restart(coordinator_host.server_id, wait_others=1) @@ -123,5 +135,5 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it") coordinator_host = await get_coordinator_host(manager) await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id) - await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + scale_timeout(60)) await check_token_ring_and_group0_consistency(manager) diff --git a/test/cluster/util.py b/test/cluster/util.py index 008c879041..6b7eef4a50 100644 --- a/test/cluster/util.py +++ b/test/cluster/util.py @@ -18,7 +18,7 @@ from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvail from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module from test.pylib.internal_types import ServerInfo, HostID from test.pylib.manager_client import ManagerClient -from test.pylib.rest_client import get_host_api_address, read_barrier +from test.pylib.rest_client import HTTPError, get_host_api_address, read_barrier from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name from typing import Optional, List, Union @@ -119,6 +119,42 @@ async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> Non assert token_ring_ids == group0_ids +async def wait_for_no_pending_topology_transition(manager: ManagerClient, deadline: float) -> None: + """Wait until there is no pending topology transition. + Polls system.topology until the transition_state column is null, + indicating that the topology coordinator has finished processing the + current operation (whether it completed successfully or was rolled back). + """ + cql = manager.get_cql() + + async def no_transition(): + try: + host = await get_available_host(cql, deadline) + await read_barrier(manager.api, get_host_api_address(host)) + rs = await cql.run_async( + "select transition_state from system.topology where key = 'topology'", + host=host) + except NoHostAvailable as e: + logger.info(f"Topology transition check failed, retrying: {e}") + return None + except ConnectionException as e: + logger.info(f"Topology transition check failed, retrying: {e}") + return None + except HTTPError as e: + logger.info(f"Read barrier failed, retrying: {e}") + return None + + if not rs: + logger.warning(f"Topology transition not visible: system.topology row not found, retrying") + return None + if rs[0].transition_state is not None: + logger.warning(f"Topology transition still in progress: {rs[0].transition_state}") + return None + return True + + await wait_for(no_transition, deadline, period=.5) + + async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None: """ Weaker version of the above check. @@ -398,13 +434,14 @@ def get_uuid_from_str(string: str) -> str: async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None: """Wait new coordinator to be elected - Wait while the table 'system.group0_history' will have a number of lines - with the 'new topology coordinator' equal to the expected_num_of_elections number, + Wait while the table 'system.group0_history' will have at least + expected_num_of_elections lines with 'new topology coordinator', and the latest host_id coordinator differs from the previous one. """ async def new_coordinator_elected(): coordinators_ids = await get_coordinator_host_ids(manager) - if len(coordinators_ids) == expected_num_of_elections \ + logger.debug(f"Coordinators ids in history: {coordinators_ids}") + if len(coordinators_ids) >= expected_num_of_elections \ and coordinators_ids[0] != coordinators_ids[1]: return True logger.warning("New coordinator was not elected %s", coordinators_ids)