mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 03:20:37 +00:00
test/cluster: wait for Alternator readiness in server startup
server_add() only waits for CQL readiness before returning. The Alternator HTTP port may not be listening yet, causing ConnectionRefused with Alternator tests. Extend the ServerUpState enum and startup loop to also check Alternator port readiness when configured. Whenever Alternator port(s) is/are configured, each is verified if connectable and queryable, similar to how CQL ports are probed. Fixes SCYLLADB-1701 Closes scylladb/scylladb#29625
This commit is contained in:
committed by
Avi Kivity
parent
d14d07a079
commit
d5efd1f676
@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
|
||||
class ServerUpState(IntEnum):
|
||||
PROCESS_STARTED = auto()
|
||||
HOST_ID_QUERIED = auto()
|
||||
CQL_CONNECTED = auto()
|
||||
CQL_QUERIED = auto()
|
||||
CQL_ALTERNATOR_CONNECTED = auto()
|
||||
CQL_ALTERNATOR_QUERIED = auto()
|
||||
SERVING = auto() # Scylla sent sd_notify("serving")
|
||||
|
||||
@@ -389,7 +389,7 @@ class ManagerClient:
|
||||
seeds: list[IPAddress] | None = None,
|
||||
timeout: float | None = None,
|
||||
connect_driver: bool = True,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -540,7 +540,7 @@ class ManagerClient:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
|
||||
server_encryption: str = "none",
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
connect_driver: bool = True) -> ServerInfo:
|
||||
"""Add a new server"""
|
||||
if expected_error is not None:
|
||||
|
||||
@@ -41,6 +41,7 @@ import yaml
|
||||
import signal
|
||||
import glob
|
||||
import errno
|
||||
import json
|
||||
import re
|
||||
import platform
|
||||
import contextlib
|
||||
@@ -557,7 +558,7 @@ class ScyllaServer:
|
||||
async def install_and_start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
|
||||
"""Setup and start this server."""
|
||||
|
||||
await self.install()
|
||||
@@ -677,10 +678,63 @@ class ScyllaServer:
|
||||
return None
|
||||
return maintenance_socket_option
|
||||
|
||||
async def get_cql_up_state(self) -> ServerUpState | None:
|
||||
"""Get the CQL up state (a check we use at start up).
|
||||
def _alternator_ports(self) -> list[tuple[str, int]]:
|
||||
"""Return (scheme, port) for every configured Alternator port."""
|
||||
ports = []
|
||||
if "alternator_port" in self.config:
|
||||
ports.append(("http", self.config["alternator_port"]))
|
||||
if "alternator_https_port" in self.config:
|
||||
ports.append(("https", self.config["alternator_https_port"]))
|
||||
return ports
|
||||
|
||||
Return None if it fails to connect.
|
||||
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""TCP connect to every configured Alternator port.
|
||||
|
||||
Returns True if all ports accept connections.
|
||||
"""
|
||||
for _, port in ports:
|
||||
try:
|
||||
_, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(self.ip_addr, port), timeout=2)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except (OSError, asyncio.TimeoutError):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""Sends a GetItem for a randomly-named nonexistent table and validates
|
||||
that the response is a DynamoDB-shaped JSON error (contains __type),
|
||||
confirming Alternator is processing DynamoDB API requests.
|
||||
|
||||
Returns True if all ports respond correctly.
|
||||
"""
|
||||
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
|
||||
headers = {
|
||||
"Content-Type": "application/x-amz-json-1.0",
|
||||
"X-Amz-Target": "DynamoDB_20120810.GetItem",
|
||||
}
|
||||
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
|
||||
timeout = aiohttp.ClientTimeout(total=2)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
for scheme, port in ports:
|
||||
url = f"{scheme}://{self.ip_addr}:{port}/"
|
||||
try:
|
||||
# ssl=False skips certificate verification
|
||||
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
|
||||
response_body = await resp.json(content_type=None)
|
||||
if "__type" not in response_body:
|
||||
return False
|
||||
except Exception as exc:
|
||||
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
|
||||
return False
|
||||
return True
|
||||
|
||||
async def get_cql_up_state(self) -> tuple[bool, bool]:
|
||||
"""Check CQL connectivity.
|
||||
|
||||
Returns (connected, queried) indicating whether a CQL connection
|
||||
was established and whether a query executed successfully.
|
||||
"""
|
||||
caslog = logging.getLogger('cassandra')
|
||||
oldlevel = caslog.getEffectiveLevel()
|
||||
@@ -708,6 +762,7 @@ class ScyllaServer:
|
||||
request_timeout=self.TOPOLOGY_TIMEOUT)
|
||||
contact_points=[self.rpc_address]
|
||||
connected = False
|
||||
cql_queried = False
|
||||
try:
|
||||
# In a cluster setup, it's possible that the CQL
|
||||
# here is directed to a node different from the initial contact
|
||||
@@ -730,13 +785,32 @@ class ScyllaServer:
|
||||
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
|
||||
auth_provider=self.auth_provider)
|
||||
self.control_connection = self.control_cluster.connect()
|
||||
return ServerUpState.CQL_QUERIED
|
||||
cql_queried = True
|
||||
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
|
||||
self.logger.debug("Exception when checking if CQL is up: %s", exc)
|
||||
return ServerUpState.CQL_CONNECTED if connected else None
|
||||
finally:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
return connected, cql_queried
|
||||
|
||||
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
|
||||
connected = await self.check_alternator_connected(ports)
|
||||
queried = connected and await self.check_alternator_queried(ports)
|
||||
return connected, queried
|
||||
|
||||
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
|
||||
"""Get the combined CQL + Alternator up state."""
|
||||
cql_connected, cql_queried = await self.get_cql_up_state()
|
||||
alt_connected, alt_queried = False, False
|
||||
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
|
||||
if alt_ports:
|
||||
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
|
||||
if cql_queried and (alt_queried or not alt_ports):
|
||||
return ServerUpState.CQL_ALTERNATOR_QUERIED
|
||||
if not cql_connected or (alt_ports and not alt_connected):
|
||||
return None
|
||||
# Here both CQL and Alternator (if exists) are at least connected
|
||||
return ServerUpState.CQL_ALTERNATOR_CONNECTED
|
||||
|
||||
def _setup_notify_socket(self) -> None:
|
||||
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
|
||||
@@ -824,7 +898,7 @@ class ScyllaServer:
|
||||
async def start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None) -> None:
|
||||
"""Start an installed server.
|
||||
@@ -897,9 +971,9 @@ class ScyllaServer:
|
||||
if await self.try_get_host_id(api):
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
server_up_state = await self.get_cql_up_state() or server_up_state
|
||||
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
|
||||
# Check for SERVING state (sd_notify "serving" message)
|
||||
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
|
||||
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
|
||||
server_up_state = ServerUpState.SERVING
|
||||
if server_up_state >= expected_server_up_state:
|
||||
if expected_error is not None:
|
||||
@@ -1194,7 +1268,7 @@ class ScyllaCluster:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
server_encryption: str = "none",
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
|
||||
"""Add a new server to the cluster"""
|
||||
self.is_dirty = True
|
||||
|
||||
@@ -1434,7 +1508,7 @@ class ScyllaCluster:
|
||||
server_id: ServerNum,
|
||||
expected_error: str | None = None,
|
||||
seeds: list[IPAddress] | None = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -1917,7 +1991,7 @@ class ScyllaClusterManager:
|
||||
server_id=server_id,
|
||||
expected_error=data.get("expected_error"),
|
||||
seeds=data.get("seeds"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
cmdline_options_override=data.get("cmdline_options_override"),
|
||||
append_env_override=data.get("append_env_override"),
|
||||
auth_provider=data.get("auth_provider"),
|
||||
@@ -1952,7 +2026,7 @@ class ScyllaClusterManager:
|
||||
seeds=data.get("seeds"),
|
||||
server_encryption=data.get("server_encryption", "none"),
|
||||
expected_error=data.get("expected_error"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
)
|
||||
return s_info.as_dict()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user