test/pylib: ManagerClient helpers to wait for...
server to see other servers after start/restart When starting/restarting a server, provide a way to wait for the server to see at least n other servers. Also leave the implementation methods available for manual use and update previous tests, one to wait for a specific server to be seen, and one to wait for a specific server to not be seen (down). Fixes #13147 Signed-off-by: Alejo Sanchez <alejo.sanchez@scylladb.com> Closes #13438
This commit is contained in:
committed by
Kamil Braun
parent
342cdb2a63
commit
11561a73cb
@@ -48,7 +48,7 @@ class ManagerClient():
|
||||
"""Close driver"""
|
||||
self.driver_close()
|
||||
|
||||
async def driver_connect(self, server=None) -> None:
|
||||
async def driver_connect(self, server: Optional[ServerInfo] = None) -> None:
|
||||
"""Connect to cluster"""
|
||||
if self.con_gen is not None:
|
||||
targets = [server] if server else await self.running_servers()
|
||||
@@ -139,17 +139,21 @@ class ManagerClient():
|
||||
logger.debug("ManagerClient stopping gracefully %s", server_id)
|
||||
await self.client.get_text(f"/cluster/server/{server_id}/stop_gracefully")
|
||||
|
||||
async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None) -> None:
|
||||
"""Start specified server"""
|
||||
async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None,
|
||||
wait_others: int = 0, wait_interval: float = 45) -> None:
|
||||
"""Start specified server and optionally wait for it to learn of other servers"""
|
||||
logger.debug("ManagerClient starting %s", server_id)
|
||||
params = {'expected_error': expected_error} if expected_error is not None else None
|
||||
await self.client.get_text(f"/cluster/server/{server_id}/start", params=params)
|
||||
await self.server_sees_others(server_id, wait_others, interval = wait_interval)
|
||||
self._driver_update()
|
||||
|
||||
async def server_restart(self, server_id: ServerNum) -> None:
|
||||
"""Restart specified server"""
|
||||
async def server_restart(self, server_id: ServerNum, wait_others: int = 0,
|
||||
wait_interval: float = 45) -> None:
|
||||
"""Restart specified server and optionally wait for it to learn of other servers"""
|
||||
logger.debug("ManagerClient restarting %s", server_id)
|
||||
await self.client.get_text(f"/cluster/server/{server_id}/restart")
|
||||
await self.server_sees_others(server_id, wait_others, interval = wait_interval)
|
||||
self._driver_update()
|
||||
|
||||
async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo:
|
||||
@@ -239,3 +243,32 @@ class ManagerClient():
|
||||
except Exception as exc:
|
||||
raise Exception(f"Failed to get local host id address for server {server_id}") from exc
|
||||
return HostID(host_id)
|
||||
|
||||
async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.):
|
||||
"""Wait till a server sees a minimum given count of other servers"""
|
||||
if count < 1:
|
||||
return
|
||||
server_ip = await self.get_host_ip(server_id)
|
||||
async def _sees_min_others():
|
||||
alive_nodes = await self.api.get_alive_endpoints(server_ip)
|
||||
if len(alive_nodes) > count:
|
||||
return True
|
||||
await wait_for(_sees_min_others, time() + interval, period=.1)
|
||||
|
||||
async def server_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress,
|
||||
interval: float = 45.):
|
||||
"""Wait till a server sees another specific server IP as alive"""
|
||||
async def _sees_another_server():
|
||||
alive_nodes = await self.api.get_alive_endpoints(server_ip)
|
||||
if other_ip in alive_nodes:
|
||||
return True
|
||||
await wait_for(_sees_another_server, time() + interval, period=.1)
|
||||
|
||||
async def server_not_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress,
|
||||
interval: float = 45.):
|
||||
"""Wait till a server sees another specific server IP as dead"""
|
||||
async def _not_sees_another_server():
|
||||
alive_nodes = await self.api.get_alive_endpoints(server_ip)
|
||||
if not other_ip in alive_nodes:
|
||||
return True
|
||||
await wait_for(_not_sees_another_server, time() + interval, period=.1)
|
||||
|
||||
@@ -9,24 +9,16 @@ Reproducer for a failure during lwt operation due to missing of a column mapping
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from functools import partial
|
||||
from test.pylib.rest_client import inject_error_one_shot, inject_error
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.internal_types import IPAddress, ServerInfo
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
import pytest
|
||||
from cassandra.cluster import Cluster, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def server_sees_another_server(server: ServerInfo, manager: ManagerClient):
|
||||
alive_nodes = await manager.api.get_alive_endpoints(server.ip_addr)
|
||||
if len(alive_nodes) > 1:
|
||||
return True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mutation_schema_change(manager, random_tables):
|
||||
"""
|
||||
@@ -68,9 +60,7 @@ async def test_mutation_schema_change(manager, random_tables):
|
||||
logger.info("Stopping B %s", server_b)
|
||||
await manager.server_stop_gracefully(server_b.server_id)
|
||||
logger.info("Starting C %s", server_c)
|
||||
await manager.server_start(server_c.server_id)
|
||||
|
||||
await wait_for(partial(server_sees_another_server, server_c, manager), time.time() + 45, period=.1)
|
||||
await manager.server_start(server_c.server_id, wait_others = 1)
|
||||
|
||||
logger.info("Driver connecting to C %s", server_c)
|
||||
await manager.driver_connect(server=server_c)
|
||||
@@ -136,11 +126,10 @@ async def test_mutation_schema_change_restart(manager, random_tables):
|
||||
logger.info("Restarting A %s", server_a)
|
||||
await manager.server_restart(server_a.server_id)
|
||||
logger.info("Starting C %s", server_c)
|
||||
await manager.server_start(server_c.server_id)
|
||||
await manager.server_start(server_c.server_id, wait_others = 1) # Wait C to see another one (A)
|
||||
|
||||
# Wait for C and A to see each other
|
||||
await wait_for(partial(server_sees_another_server, server_c, manager), time.time() + 45, period=.1)
|
||||
await wait_for(partial(server_sees_another_server, server_a, manager), time.time() + 45, period=.1)
|
||||
# Wait for A to see C
|
||||
await manager.server_sees_other_server(server_a.ip_addr, server_c.ip_addr)
|
||||
|
||||
logger.info("Driver connecting to A %s", server_a)
|
||||
await manager.driver_connect(server=server_a)
|
||||
|
||||
@@ -81,6 +81,8 @@ async def test_remove_garbage_group0_members(manager: ManagerClient):
|
||||
logging.info(f'stop {servers[1]}')
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
logging.debug(f'waiting for {servers[2]} to see {servers[1]} is down')
|
||||
await manager.server_not_sees_other_server(servers[2].ip_addr, servers[1].ip_addr)
|
||||
logging.info(f'removenode {servers[1]} using {servers[2]}')
|
||||
await manager.remove_node(servers[2].server_id, servers[1].server_id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user