From d5efd1f67685b22ee5b3c8ea31faa967441d3459 Mon Sep 17 00:00:00 2001 From: Piotr Szymaniak Date: Fri, 24 Apr 2026 11:20:56 +0200 Subject: [PATCH] test/cluster: wait for Alternator readiness in server startup server_add() only waits for CQL readiness before returning. The Alternator HTTP port may not be listening yet, causing ConnectionRefused with Alternator tests. Extend the ServerUpState enum and startup loop to also check Alternator port readiness when configured. Whenever Alternator port(s) is/are configured, each is verified if connectable and queryable, similar to how CQL ports are probed. Fixes SCYLLADB-1701 Closes scylladb/scylladb#29625 --- test/pylib/internal_types.py | 4 +- test/pylib/manager_client.py | 4 +- test/pylib/scylla_cluster.py | 100 ++++++++++++++++++++++++++++++----- 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/test/pylib/internal_types.py b/test/pylib/internal_types.py index 724d90d7bf..8d5fe30bf3 100644 --- a/test/pylib/internal_types.py +++ b/test/pylib/internal_types.py @@ -37,6 +37,6 @@ class ServerInfo(NamedTuple): class ServerUpState(IntEnum): PROCESS_STARTED = auto() HOST_ID_QUERIED = auto() - CQL_CONNECTED = auto() - CQL_QUERIED = auto() + CQL_ALTERNATOR_CONNECTED = auto() + CQL_ALTERNATOR_QUERIED = auto() SERVING = auto() # Scylla sent sd_notify("serving") diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 2062712d55..883b3cb3cf 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -389,7 +389,7 @@ class ManagerClient: seeds: list[IPAddress] | None = None, timeout: float | None = None, connect_driver: bool = True, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, cmdline_options_override: list[str] | None = None, append_env_override: dict[str, str] | None = None, auth_provider: dict[str, str] | None = None) -> None: @@ -540,7 +540,7 @@ class ManagerClient: seeds: Optional[List[IPAddress]] = None, timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT, server_encryption: str = "none", - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, connect_driver: bool = True) -> ServerInfo: """Add a new server""" if expected_error is not None: diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index 53f24e8034..971cd2e1e8 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -41,6 +41,7 @@ import yaml import signal import glob import errno +import json import re import platform import contextlib @@ -557,7 +558,7 @@ class ScyllaServer: async def install_and_start(self, api: ScyllaRESTAPIClient, expected_error: Optional[str] = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None: + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None: """Setup and start this server.""" await self.install() @@ -677,10 +678,63 @@ class ScyllaServer: return None return maintenance_socket_option - async def get_cql_up_state(self) -> ServerUpState | None: - """Get the CQL up state (a check we use at start up). + def _alternator_ports(self) -> list[tuple[str, int]]: + """Return (scheme, port) for every configured Alternator port.""" + ports = [] + if "alternator_port" in self.config: + ports.append(("http", self.config["alternator_port"])) + if "alternator_https_port" in self.config: + ports.append(("https", self.config["alternator_https_port"])) + return ports - Return None if it fails to connect. + async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool: + """TCP connect to every configured Alternator port. + + Returns True if all ports accept connections. + """ + for _, port in ports: + try: + _, writer = await asyncio.wait_for( + asyncio.open_connection(self.ip_addr, port), timeout=2) + writer.close() + await writer.wait_closed() + except (OSError, asyncio.TimeoutError): + return False + return True + + async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool: + """Sends a GetItem for a randomly-named nonexistent table and validates + that the response is a DynamoDB-shaped JSON error (contains __type), + confirming Alternator is processing DynamoDB API requests. + + Returns True if all ports respond correctly. + """ + table_name = f"nonexistent_table_{uuid.uuid4().hex}" + headers = { + "Content-Type": "application/x-amz-json-1.0", + "X-Amz-Target": "DynamoDB_20120810.GetItem", + } + body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}}) + timeout = aiohttp.ClientTimeout(total=2) + async with aiohttp.ClientSession(timeout=timeout) as session: + for scheme, port in ports: + url = f"{scheme}://{self.ip_addr}:{port}/" + try: + # ssl=False skips certificate verification + async with session.post(url, headers=headers, data=body, ssl=False) as resp: + response_body = await resp.json(content_type=None) + if "__type" not in response_body: + return False + except Exception as exc: + self.logger.debug("Alternator query check failed for %s: %s", url, exc) + return False + return True + + async def get_cql_up_state(self) -> tuple[bool, bool]: + """Check CQL connectivity. + + Returns (connected, queried) indicating whether a CQL connection + was established and whether a query executed successfully. """ caslog = logging.getLogger('cassandra') oldlevel = caslog.getEffectiveLevel() @@ -708,6 +762,7 @@ class ScyllaServer: request_timeout=self.TOPOLOGY_TIMEOUT) contact_points=[self.rpc_address] connected = False + cql_queried = False try: # In a cluster setup, it's possible that the CQL # here is directed to a node different from the initial contact @@ -730,13 +785,32 @@ class ScyllaServer: control_connection_timeout=self.TOPOLOGY_TIMEOUT, auth_provider=self.auth_provider) self.control_connection = self.control_cluster.connect() - return ServerUpState.CQL_QUERIED + cql_queried = True except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc: self.logger.debug("Exception when checking if CQL is up: %s", exc) - return ServerUpState.CQL_CONNECTED if connected else None finally: caslog.setLevel(oldlevel) # Any other exception may indicate a problem, and is passed to the caller. + return connected, cql_queried + + async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]: + connected = await self.check_alternator_connected(ports) + queried = connected and await self.check_alternator_queried(ports) + return connected, queried + + async def get_cql_alternator_up_state(self) -> ServerUpState | None: + """Get the combined CQL + Alternator up state.""" + cql_connected, cql_queried = await self.get_cql_up_state() + alt_connected, alt_queried = False, False + alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator + if alt_ports: + alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports) + if cql_queried and (alt_queried or not alt_ports): + return ServerUpState.CQL_ALTERNATOR_QUERIED + if not cql_connected or (alt_ports and not alt_connected): + return None + # Here both CQL and Alternator (if exists) are at least connected + return ServerUpState.CQL_ALTERNATOR_CONNECTED def _setup_notify_socket(self) -> None: """Create a Unix datagram socket for receiving sd_notify messages from Scylla.""" @@ -824,7 +898,7 @@ class ScyllaServer: async def start(self, api: ScyllaRESTAPIClient, expected_error: Optional[str] = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, cmdline_options_override: list[str] | None = None, append_env_override: dict[str, str] | None = None) -> None: """Start an installed server. @@ -897,9 +971,9 @@ class ScyllaServer: if await self.try_get_host_id(api): if server_up_state == ServerUpState.PROCESS_STARTED: server_up_state = ServerUpState.HOST_ID_QUERIED - server_up_state = await self.get_cql_up_state() or server_up_state + server_up_state = await self.get_cql_alternator_up_state() or server_up_state # Check for SERVING state (sd_notify "serving" message) - if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification(): + if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification(): server_up_state = ServerUpState.SERVING if server_up_state >= expected_server_up_state: if expected_error is not None: @@ -1194,7 +1268,7 @@ class ScyllaCluster: seeds: Optional[List[IPAddress]] = None, server_encryption: str = "none", expected_error: Optional[str] = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo: + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo: """Add a new server to the cluster""" self.is_dirty = True @@ -1434,7 +1508,7 @@ class ScyllaCluster: server_id: ServerNum, expected_error: str | None = None, seeds: list[IPAddress] | None = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, cmdline_options_override: list[str] | None = None, append_env_override: dict[str, str] | None = None, auth_provider: dict[str, str] | None = None) -> None: @@ -1917,7 +1991,7 @@ class ScyllaClusterManager: server_id=server_id, expected_error=data.get("expected_error"), seeds=data.get("seeds"), - expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")), + expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")), cmdline_options_override=data.get("cmdline_options_override"), append_env_override=data.get("append_env_override"), auth_provider=data.get("auth_provider"), @@ -1952,7 +2026,7 @@ class ScyllaClusterManager: seeds=data.get("seeds"), server_encryption=data.get("server_encryption", "none"), expected_error=data.get("expected_error"), - expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")), + expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")), ) return s_info.as_dict()