diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index cab70b78ad..4204fe8adc 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -48,7 +48,7 @@ class ManagerClient(): """Close driver""" self.driver_close() - async def driver_connect(self, server=None) -> None: + async def driver_connect(self, server: Optional[ServerInfo] = None) -> None: """Connect to cluster""" if self.con_gen is not None: targets = [server] if server else await self.running_servers() @@ -139,17 +139,21 @@ class ManagerClient(): logger.debug("ManagerClient stopping gracefully %s", server_id) await self.client.get_text(f"/cluster/server/{server_id}/stop_gracefully") - async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None) -> None: - """Start specified server""" + async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None, + wait_others: int = 0, wait_interval: float = 45) -> None: + """Start specified server and optionally wait for it to learn of other servers""" logger.debug("ManagerClient starting %s", server_id) params = {'expected_error': expected_error} if expected_error is not None else None await self.client.get_text(f"/cluster/server/{server_id}/start", params=params) + await self.server_sees_others(server_id, wait_others, interval = wait_interval) self._driver_update() - async def server_restart(self, server_id: ServerNum) -> None: - """Restart specified server""" + async def server_restart(self, server_id: ServerNum, wait_others: int = 0, + wait_interval: float = 45) -> None: + """Restart specified server and optionally wait for it to learn of other servers""" logger.debug("ManagerClient restarting %s", server_id) await self.client.get_text(f"/cluster/server/{server_id}/restart") + await self.server_sees_others(server_id, wait_others, interval = wait_interval) self._driver_update() async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo: @@ -239,3 +243,32 @@ class ManagerClient(): except Exception as exc: raise Exception(f"Failed to get local host id address for server {server_id}") from exc return HostID(host_id) + + async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.): + """Wait till a server sees a minimum given count of other servers""" + if count < 1: + return + server_ip = await self.get_host_ip(server_id) + async def _sees_min_others(): + alive_nodes = await self.api.get_alive_endpoints(server_ip) + if len(alive_nodes) > count: + return True + await wait_for(_sees_min_others, time() + interval, period=.1) + + async def server_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress, + interval: float = 45.): + """Wait till a server sees another specific server IP as alive""" + async def _sees_another_server(): + alive_nodes = await self.api.get_alive_endpoints(server_ip) + if other_ip in alive_nodes: + return True + await wait_for(_sees_another_server, time() + interval, period=.1) + + async def server_not_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress, + interval: float = 45.): + """Wait till a server sees another specific server IP as dead""" + async def _not_sees_another_server(): + alive_nodes = await self.api.get_alive_endpoints(server_ip) + if not other_ip in alive_nodes: + return True + await wait_for(_not_sees_another_server, time() + interval, period=.1) diff --git a/test/topology/test_mutation_schema_change.py b/test/topology/test_mutation_schema_change.py index 1d30926b13..a751840f38 100644 --- a/test/topology/test_mutation_schema_change.py +++ b/test/topology/test_mutation_schema_change.py @@ -9,24 +9,16 @@ Reproducer for a failure during lwt operation due to missing of a column mapping import asyncio import logging import time -from functools import partial from test.pylib.rest_client import inject_error_one_shot, inject_error -from test.pylib.util import wait_for, wait_for_cql_and_get_hosts -from test.pylib.manager_client import ManagerClient -from test.pylib.internal_types import IPAddress, ServerInfo +from test.pylib.util import wait_for_cql_and_get_hosts import pytest -from cassandra.cluster import Cluster, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module +from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module logger = logging.getLogger(__name__) -async def server_sees_another_server(server: ServerInfo, manager: ManagerClient): - alive_nodes = await manager.api.get_alive_endpoints(server.ip_addr) - if len(alive_nodes) > 1: - return True - @pytest.mark.asyncio async def test_mutation_schema_change(manager, random_tables): """ @@ -68,9 +60,7 @@ async def test_mutation_schema_change(manager, random_tables): logger.info("Stopping B %s", server_b) await manager.server_stop_gracefully(server_b.server_id) logger.info("Starting C %s", server_c) - await manager.server_start(server_c.server_id) - - await wait_for(partial(server_sees_another_server, server_c, manager), time.time() + 45, period=.1) + await manager.server_start(server_c.server_id, wait_others = 1) logger.info("Driver connecting to C %s", server_c) await manager.driver_connect(server=server_c) @@ -136,11 +126,10 @@ async def test_mutation_schema_change_restart(manager, random_tables): logger.info("Restarting A %s", server_a) await manager.server_restart(server_a.server_id) logger.info("Starting C %s", server_c) - await manager.server_start(server_c.server_id) + await manager.server_start(server_c.server_id, wait_others = 1) # Wait C to see another one (A) - # Wait for C and A to see each other - await wait_for(partial(server_sees_another_server, server_c, manager), time.time() + 45, period=.1) - await wait_for(partial(server_sees_another_server, server_a, manager), time.time() + 45, period=.1) + # Wait for A to see C + await manager.server_sees_other_server(server_a.ip_addr, server_c.ip_addr) logger.info("Driver connecting to A %s", server_a) await manager.driver_connect(server=server_a) diff --git a/test/topology/test_topology_remove_garbage_group0.py b/test/topology/test_topology_remove_garbage_group0.py index 90558b58d3..69975c8535 100644 --- a/test/topology/test_topology_remove_garbage_group0.py +++ b/test/topology/test_topology_remove_garbage_group0.py @@ -81,6 +81,8 @@ async def test_remove_garbage_group0_members(manager: ManagerClient): logging.info(f'stop {servers[1]}') await manager.server_stop_gracefully(servers[1].server_id) + logging.debug(f'waiting for {servers[2]} to see {servers[1]} is down') + await manager.server_not_sees_other_server(servers[2].ip_addr, servers[1].ip_addr) logging.info(f'removenode {servers[1]} using {servers[2]}') await manager.remove_node(servers[2].server_id, servers[1].server_id)