diff --git a/test/pylib/internal_types.py b/test/pylib/internal_types.py index 724d90d7bf..8d5fe30bf3 100644 --- a/test/pylib/internal_types.py +++ b/test/pylib/internal_types.py @@ -37,6 +37,6 @@ class ServerInfo(NamedTuple): class ServerUpState(IntEnum): PROCESS_STARTED = auto() HOST_ID_QUERIED = auto() - CQL_CONNECTED = auto() - CQL_QUERIED = auto() + CQL_ALTERNATOR_CONNECTED = auto() + CQL_ALTERNATOR_QUERIED = auto() SERVING = auto() # Scylla sent sd_notify("serving") diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 2062712d55..883b3cb3cf 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -389,7 +389,7 @@ class ManagerClient: seeds: list[IPAddress] | None = None, timeout: float | None = None, connect_driver: bool = True, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, cmdline_options_override: list[str] | None = None, append_env_override: dict[str, str] | None = None, auth_provider: dict[str, str] | None = None) -> None: @@ -540,7 +540,7 @@ class ManagerClient: seeds: Optional[List[IPAddress]] = None, timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT, server_encryption: str = "none", - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, connect_driver: bool = True) -> ServerInfo: """Add a new server""" if expected_error is not None: diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index 53f24e8034..971cd2e1e8 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -41,6 +41,7 @@ import yaml import signal import glob import errno +import json import re import platform import contextlib @@ -557,7 +558,7 @@ class ScyllaServer: async def install_and_start(self, api: ScyllaRESTAPIClient, expected_error: Optional[str] = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None: + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None: """Setup and start this server.""" await self.install() @@ -677,10 +678,63 @@ class ScyllaServer: return None return maintenance_socket_option - async def get_cql_up_state(self) -> ServerUpState | None: - """Get the CQL up state (a check we use at start up). + def _alternator_ports(self) -> list[tuple[str, int]]: + """Return (scheme, port) for every configured Alternator port.""" + ports = [] + if "alternator_port" in self.config: + ports.append(("http", self.config["alternator_port"])) + if "alternator_https_port" in self.config: + ports.append(("https", self.config["alternator_https_port"])) + return ports - Return None if it fails to connect. + async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool: + """TCP connect to every configured Alternator port. + + Returns True if all ports accept connections. + """ + for _, port in ports: + try: + _, writer = await asyncio.wait_for( + asyncio.open_connection(self.ip_addr, port), timeout=2) + writer.close() + await writer.wait_closed() + except (OSError, asyncio.TimeoutError): + return False + return True + + async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool: + """Sends a GetItem for a randomly-named nonexistent table and validates + that the response is a DynamoDB-shaped JSON error (contains __type), + confirming Alternator is processing DynamoDB API requests. + + Returns True if all ports respond correctly. + """ + table_name = f"nonexistent_table_{uuid.uuid4().hex}" + headers = { + "Content-Type": "application/x-amz-json-1.0", + "X-Amz-Target": "DynamoDB_20120810.GetItem", + } + body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}}) + timeout = aiohttp.ClientTimeout(total=2) + async with aiohttp.ClientSession(timeout=timeout) as session: + for scheme, port in ports: + url = f"{scheme}://{self.ip_addr}:{port}/" + try: + # ssl=False skips certificate verification + async with session.post(url, headers=headers, data=body, ssl=False) as resp: + response_body = await resp.json(content_type=None) + if "__type" not in response_body: + return False + except Exception as exc: + self.logger.debug("Alternator query check failed for %s: %s", url, exc) + return False + return True + + async def get_cql_up_state(self) -> tuple[bool, bool]: + """Check CQL connectivity. + + Returns (connected, queried) indicating whether a CQL connection + was established and whether a query executed successfully. """ caslog = logging.getLogger('cassandra') oldlevel = caslog.getEffectiveLevel() @@ -708,6 +762,7 @@ class ScyllaServer: request_timeout=self.TOPOLOGY_TIMEOUT) contact_points=[self.rpc_address] connected = False + cql_queried = False try: # In a cluster setup, it's possible that the CQL # here is directed to a node different from the initial contact @@ -730,13 +785,32 @@ class ScyllaServer: control_connection_timeout=self.TOPOLOGY_TIMEOUT, auth_provider=self.auth_provider) self.control_connection = self.control_cluster.connect() - return ServerUpState.CQL_QUERIED + cql_queried = True except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc: self.logger.debug("Exception when checking if CQL is up: %s", exc) - return ServerUpState.CQL_CONNECTED if connected else None finally: caslog.setLevel(oldlevel) # Any other exception may indicate a problem, and is passed to the caller. + return connected, cql_queried + + async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]: + connected = await self.check_alternator_connected(ports) + queried = connected and await self.check_alternator_queried(ports) + return connected, queried + + async def get_cql_alternator_up_state(self) -> ServerUpState | None: + """Get the combined CQL + Alternator up state.""" + cql_connected, cql_queried = await self.get_cql_up_state() + alt_connected, alt_queried = False, False + alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator + if alt_ports: + alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports) + if cql_queried and (alt_queried or not alt_ports): + return ServerUpState.CQL_ALTERNATOR_QUERIED + if not cql_connected or (alt_ports and not alt_connected): + return None + # Here both CQL and Alternator (if exists) are at least connected + return ServerUpState.CQL_ALTERNATOR_CONNECTED def _setup_notify_socket(self) -> None: """Create a Unix datagram socket for receiving sd_notify messages from Scylla.""" @@ -824,7 +898,7 @@ class ScyllaServer: async def start(self, api: ScyllaRESTAPIClient, expected_error: Optional[str] = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, cmdline_options_override: list[str] | None = None, append_env_override: dict[str, str] | None = None) -> None: """Start an installed server. @@ -897,9 +971,9 @@ class ScyllaServer: if await self.try_get_host_id(api): if server_up_state == ServerUpState.PROCESS_STARTED: server_up_state = ServerUpState.HOST_ID_QUERIED - server_up_state = await self.get_cql_up_state() or server_up_state + server_up_state = await self.get_cql_alternator_up_state() or server_up_state # Check for SERVING state (sd_notify "serving" message) - if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification(): + if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification(): server_up_state = ServerUpState.SERVING if server_up_state >= expected_server_up_state: if expected_error is not None: @@ -1194,7 +1268,7 @@ class ScyllaCluster: seeds: Optional[List[IPAddress]] = None, server_encryption: str = "none", expected_error: Optional[str] = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo: + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo: """Add a new server to the cluster""" self.is_dirty = True @@ -1434,7 +1508,7 @@ class ScyllaCluster: server_id: ServerNum, expected_error: str | None = None, seeds: list[IPAddress] | None = None, - expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED, + expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED, cmdline_options_override: list[str] | None = None, append_env_override: dict[str, str] | None = None, auth_provider: dict[str, str] | None = None) -> None: @@ -1917,7 +1991,7 @@ class ScyllaClusterManager: server_id=server_id, expected_error=data.get("expected_error"), seeds=data.get("seeds"), - expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")), + expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")), cmdline_options_override=data.get("cmdline_options_override"), append_env_override=data.get("append_env_override"), auth_provider=data.get("auth_provider"), @@ -1952,7 +2026,7 @@ class ScyllaClusterManager: seeds=data.get("seeds"), server_encryption=data.get("server_encryption", "none"), expected_error=data.get("expected_error"), - expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")), + expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")), ) return s_info.as_dict()