It is observed that: repair - repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: Failed to update system.repair_history table of node d27de212-6f32-4649ad76-a9ef1165fdcb: seastar::rpc::remote_verb_error (repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: range (minimum token,maximum token) is not in the format of (start, end]) This is because repair checks the end of the range to be repaired needs to be inclusive. When small_table_optimization is enabled for regular repair, a (minimum token,maximum token) will be used. To fix, we can relax the check of (start, end] for the min max range. Fixes #27220 Closes scylladb/scylladb#27357
702 lines
34 KiB
Python
702 lines
34 KiB
Python
#
|
|
# Copyright (C) 2022-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
"""Asynchronous helper for Scylla REST API operations.
|
|
"""
|
|
from __future__ import annotations # Type hints as strings
|
|
|
|
import logging
|
|
import os.path
|
|
from urllib.parse import quote
|
|
from abc import ABCMeta
|
|
from collections.abc import Mapping
|
|
from contextlib import asynccontextmanager
|
|
from typing import Any, Optional, AsyncIterator
|
|
|
|
import pytest
|
|
from aiohttp import request, BaseConnector, UnixConnector, ClientTimeout
|
|
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
|
|
|
from test.pylib.internal_types import IPAddress, HostID
|
|
from test.pylib.util import universalasync_typed_wrap
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HTTPError(Exception):
|
|
def __init__(self, uri, code, params, json, message):
|
|
super().__init__(message)
|
|
self.uri = uri
|
|
self.code = code
|
|
self.params = params
|
|
self.json = json
|
|
self.message = message
|
|
|
|
def __str__(self):
|
|
return f"HTTP error {self.code}, uri: {self.uri}, " \
|
|
f"params: {self.params}, json: {self.json}, body:\n{self.message}"
|
|
|
|
|
|
# TODO: support ssl and verify_ssl
|
|
class RESTClient(metaclass=ABCMeta):
|
|
"""Base class for sesion-free REST client"""
|
|
connector: Optional[BaseConnector]
|
|
uri_scheme: str # e.g. http, http+unix
|
|
default_host: str
|
|
default_port: Optional[int]
|
|
# pylint: disable=too-many-arguments
|
|
|
|
async def _fetch(self, method: str, resource: str, response_type: Optional[str] = None,
|
|
host: Optional[str] = None, port: Optional[int] = None,
|
|
params: Optional[Mapping[str, str]] = None,
|
|
json: Optional[Mapping] = None, timeout: Optional[float] = None, allow_failed: bool = False) -> Any:
|
|
# Can raise exception. See https://docs.aiohttp.org/en/latest/web_exceptions.html
|
|
assert method in ["GET", "POST", "PUT", "DELETE"], f"Invalid HTTP request method {method}"
|
|
assert response_type is None or response_type in ["text", "json"], \
|
|
f"Invalid response type requested {response_type} (expected 'text' or 'json')"
|
|
# Build the URI
|
|
port = port if port else self.default_port if hasattr(self, "default_port") else None
|
|
port_str = f":{port}" if port else ""
|
|
assert host is not None or hasattr(self, "default_host"), "_fetch: missing host for " \
|
|
"{method} {resource}"
|
|
host_str = host if host is not None else self.default_host
|
|
uri = self.uri_scheme + "://" + host_str + port_str + resource
|
|
logging.debug(f"RESTClient fetching {method} {uri}")
|
|
|
|
client_timeout = ClientTimeout(total = timeout if timeout is not None else 300)
|
|
async with request(method, uri,
|
|
connector = self.connector if hasattr(self, "connector") else None,
|
|
params = params, json = json, timeout = client_timeout) as resp:
|
|
if allow_failed:
|
|
return await resp.json()
|
|
if resp.status != 200:
|
|
text = await resp.text()
|
|
raise HTTPError(uri, resp.status, params, json, text)
|
|
if response_type is not None:
|
|
# Return response.text() or response.json()
|
|
return await getattr(resp, response_type)()
|
|
return None
|
|
|
|
async def get(self, resource_uri: str, host: Optional[str] = None, port: Optional[int] = None,
|
|
params: Optional[Mapping[str, str]] = None, allow_failed: bool = False) -> Any:
|
|
return await self._fetch("GET", resource_uri, host = host, port = port, params = params, allow_failed=allow_failed)
|
|
|
|
async def get_text(self, resource_uri: str, host: Optional[str] = None,
|
|
port: Optional[int] = None, params: Optional[Mapping[str, str]] = None,
|
|
timeout: Optional[float] = None) -> str:
|
|
ret = await self._fetch("GET", resource_uri, response_type = "text", host = host,
|
|
port = port, params = params, timeout = timeout)
|
|
assert isinstance(ret, str), f"get_text: expected str but got {type(ret)} {ret}"
|
|
return ret
|
|
|
|
async def get_json(self, resource_uri: str, host: Optional[str] = None,
|
|
port: Optional[int] = None, params: Optional[Mapping[str, str]] = None,
|
|
allow_failed: bool = False) -> Any:
|
|
"""Fetch URL and get JSON. Caller must check JSON content types."""
|
|
ret = await self._fetch("GET", resource_uri, response_type = "json", host = host,
|
|
port = port, params = params, allow_failed = allow_failed)
|
|
return ret
|
|
|
|
async def post(self, resource_uri: str, host: Optional[str] = None,
|
|
port: Optional[int] = None, params: Optional[Mapping[str, str]] = None,
|
|
json: Optional[Mapping] = None, timeout: Optional[float] = None) -> None:
|
|
await self._fetch("POST", resource_uri, host = host, port = port, params = params,
|
|
json = json, timeout = timeout)
|
|
|
|
async def post_json(self, resource_uri: str, host: Optional[str] = None,
|
|
port: Optional[int] = None, params: Optional[Mapping[str, str]] = None,
|
|
json: Optional[Mapping] = None, timeout: Optional[float] = None) -> None:
|
|
ret = await self._fetch("POST", resource_uri, response_type = "json", host = host, port = port, params = params,
|
|
json = json, timeout = timeout)
|
|
return ret
|
|
|
|
async def put_json(self, resource_uri: str, data: Optional[Mapping] = None, host: Optional[str] = None,
|
|
port: Optional[int] = None, params: Optional[dict[str, str]] = None,
|
|
response_type: Optional[str] = None, timeout: Optional[float] = None) -> Any:
|
|
ret = await self._fetch("PUT", resource_uri, response_type = response_type, host = host,
|
|
port = port, params = params, json = data, timeout = timeout)
|
|
return ret
|
|
|
|
async def delete(self, resource_uri: str, host: Optional[str] = None,
|
|
port: Optional[int] = None, params: Optional[dict[str, str]] = None,
|
|
json: Optional[Mapping] = None) -> None:
|
|
await self._fetch("DELETE", resource_uri, host = host, port = port, params = params,
|
|
json = json)
|
|
|
|
|
|
class UnixRESTClient(RESTClient):
|
|
"""An async helper for REST API operations using AF_UNIX socket"""
|
|
|
|
def __init__(self, sock_path: str):
|
|
# NOTE: using Python requests style URI for Unix domain sockets to avoid using "localhost"
|
|
# host parameter is ignored but set to socket name as convention
|
|
self.uri_scheme: str = "http"
|
|
self.default_host: str = f"{os.path.basename(sock_path)}"
|
|
self.connector = UnixConnector(path=sock_path)
|
|
|
|
async def shutdown(self):
|
|
await self.connector.close()
|
|
|
|
|
|
class TCPRESTClient(RESTClient):
|
|
"""An async helper for REST API operations"""
|
|
|
|
def __init__(self, port: int):
|
|
self.uri_scheme = "http"
|
|
self.connector = None
|
|
self.default_port: int = port
|
|
|
|
|
|
@universalasync_typed_wrap
|
|
class ScyllaRESTAPIClient:
|
|
"""Async Scylla REST API client"""
|
|
|
|
def __init__(self, port: int = 10000):
|
|
self.client = TCPRESTClient(port)
|
|
|
|
async def get_host_id(self, server_ip: IPAddress) -> HostID:
|
|
"""Get server id (UUID)"""
|
|
host_uuid = await self.client.get_text("/storage_service/hostid/local", host=server_ip)
|
|
assert isinstance(host_uuid, str) and len(host_uuid) > 10, \
|
|
f"get_host_id: invalid {host_uuid}"
|
|
host_uuid = host_uuid.lstrip('"').rstrip('"')
|
|
return HostID(host_uuid)
|
|
|
|
async def get_host_id_map(self, dst_server_ip: IPAddress) -> list[HostID]:
|
|
"""Retrieve the mapping of endpoint to host ID"""
|
|
data = await self.client.get_json("/storage_service/host_id/", dst_server_ip)
|
|
assert isinstance(data, list)
|
|
return data
|
|
|
|
async def get_ownership(self, dst_server_ip: IPAddress, keyspace: str = None, table: str = None) -> list:
|
|
"""Retrieve the ownership"""
|
|
if keyspace is None and table is None:
|
|
api_path = f"/storage_service/ownership/"
|
|
elif table is None:
|
|
api_path = f"/storage_service/ownership/{keyspace}"
|
|
else:
|
|
api_path = f"/storage_service/ownership/{keyspace}?cf={table}"
|
|
data = await self.client.get_json(api_path, dst_server_ip)
|
|
return data
|
|
|
|
async def get_down_endpoints(self, node_ip: IPAddress) -> list[IPAddress]:
|
|
"""Retrieve down endpoints from gossiper's point of view """
|
|
data = await self.client.get_json("/gossiper/endpoint/down/", node_ip)
|
|
assert isinstance(data, list)
|
|
return data
|
|
|
|
async def remove_node(self, initiator_ip: IPAddress, host_id: HostID,
|
|
ignore_dead: list[IPAddress], timeout: float) -> None:
|
|
"""Initiate remove node of host_id in initiator initiator_ip"""
|
|
logger.info("remove_node for %s on %s", host_id, initiator_ip)
|
|
await self.client.post("/storage_service/remove_node",
|
|
params = {"host_id": host_id,
|
|
"ignore_nodes": ",".join(ignore_dead)},
|
|
host = initiator_ip, timeout = timeout)
|
|
logger.debug("remove_node for %s finished", host_id)
|
|
|
|
async def exclude_node(self, initiator_ip: IPAddress, hosts: list[HostID], timeout: float = 60) -> None:
|
|
"""Initiate exclude node of hosts in initiator initiator_ip"""
|
|
logger.info("exclude_node for %s on %s", hosts, initiator_ip)
|
|
await self.client.post("/storage_service/exclude_node",
|
|
params = {"hosts": ",".join(hosts)},
|
|
host = initiator_ip, timeout = timeout)
|
|
logger.debug("exclude_node for %s finished", hosts)
|
|
|
|
async def decommission_node(self, host_ip: str, timeout: float) -> None:
|
|
"""Initiate decommission node of host_ip"""
|
|
logger.debug("decommission_node %s", host_ip)
|
|
await self.client.post("/storage_service/decommission", host = host_ip,
|
|
timeout = timeout)
|
|
logger.debug("decommission_node %s finished", host_ip)
|
|
|
|
async def rebuild_node(self, host_ip: str, timeout: float) -> None:
|
|
"""Initiate rebuild of a node with host_ip"""
|
|
logger.debug("rebuild_node %s", host_ip)
|
|
await self.client.post("/storage_service/rebuild", host = host_ip,
|
|
timeout = timeout)
|
|
logger.debug("rebuild_node %s finished", host_ip)
|
|
|
|
async def get_gossip_generation_number(self, node_ip: str, target_ip: str) -> int:
|
|
"""Get the current generation number of `target_ip` observed by `node_ip`."""
|
|
data = await self.client.get_json(f"/gossiper/generation_number/{target_ip}",
|
|
host = node_ip)
|
|
assert isinstance(data, int)
|
|
return data
|
|
|
|
async def get_joining_nodes(self, node_ip: str) -> list:
|
|
"""Get the list of joining nodes according to `node_ip`."""
|
|
data = await self.client.get_json(f"/storage_service/nodes/joining", host=node_ip)
|
|
assert isinstance(data, list)
|
|
return data
|
|
|
|
async def get_alive_endpoints(self, node_ip: str) -> list:
|
|
"""Get the list of alive nodes according to `node_ip`."""
|
|
data = await self.client.get_json(f"/gossiper/endpoint/live", host=node_ip)
|
|
assert isinstance(data, list)
|
|
return data
|
|
|
|
async def get_tokens(self, node_ip: str, endpoint: str | None = None) -> list:
|
|
"""Get a list of the tokens for the specified node."""
|
|
|
|
data = await self.client.get_json(
|
|
resource_uri="/storage_service/tokens" if endpoint is None else f"/storage_service/tokens/{endpoint}",
|
|
host=node_ip,
|
|
)
|
|
assert isinstance(data, list)
|
|
return data
|
|
|
|
async def enable_injection(self, node_ip: str, injection: str, one_shot: bool, parameters: dict[str, Any] = {}) -> None:
|
|
"""Enable error injection named `injection` on `node_ip`. Depending on `one_shot`,
|
|
the injection will be executed only once or every time the process passes the injection point.
|
|
Note: this only has an effect in specific build modes: debug,dev,sanitize.
|
|
"""
|
|
await self.client.post(f"/v2/error_injection/injection/{quote(injection, safe='')}",
|
|
host=node_ip, params={"one_shot": str(one_shot)}, json={ key: str(value) for key, value in parameters.items() })
|
|
|
|
async def get_injection(self, node_ip: str, injection: str) -> list[dict[str, Any]]:
|
|
"""Read the state of the error injection named `injection` on `node_ip`.
|
|
The returned information includes whether the error injections is
|
|
active, as well as any parameters it might have.
|
|
Note: this only has an effect in specific build modes: debug,dev,sanitize.
|
|
"""
|
|
return await self.client.get_json(f"/v2/error_injection/injection/{quote(injection, safe='')}", host=node_ip)
|
|
|
|
async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int, timeout: Optional[float] = None) -> None:
|
|
await self.client.post(f"/storage_service/tablets/move", host=node_ip, timeout=timeout, params={
|
|
"ks": ks,
|
|
"table": table,
|
|
"src_host": str(src_host),
|
|
"src_shard": str(src_shard),
|
|
"dst_host": str(dst_host),
|
|
"dst_shard": str(dst_shard),
|
|
"token": str(token)
|
|
})
|
|
|
|
async def quiesce_topology(self, node_ip: str) -> None:
|
|
await self.client.post(f"/storage_service/quiesce_topology", host=node_ip)
|
|
|
|
async def add_tablet_replica(self, node_ip: str, ks: str, table: str, dst_host: HostID, dst_shard: int, token: int) -> None:
|
|
await self.client.post(f"/storage_service/tablets/add_replica", host=node_ip, params={
|
|
"ks": ks,
|
|
"table": table,
|
|
"dst_host": str(dst_host),
|
|
"dst_shard": str(dst_shard),
|
|
"token": str(token)
|
|
})
|
|
|
|
async def del_tablet_replica(self, node_ip: str, ks: str, table: str, host: HostID, shard: int, token: int) -> None:
|
|
await self.client.post(f"/storage_service/tablets/del_replica", host=node_ip, params={
|
|
"ks": ks,
|
|
"table": table,
|
|
"host": str(host),
|
|
"shard": str(shard),
|
|
"token": str(token)
|
|
})
|
|
|
|
async def tablet_repair(self, node_ip: str, ks: str, table: str, token : int | str, hosts_filter: Optional[str] = None, dcs_filter: Optional[str] = None, timeout: Optional[float] = None, await_completion: bool = True, incremental_mode: Optional[str] = None) -> None:
|
|
params={
|
|
"ks": ks,
|
|
"table": table,
|
|
"tokens": str(token),
|
|
"await_completion": str(await_completion).lower()
|
|
}
|
|
if incremental_mode is not None:
|
|
params["incremental_mode"] = str(incremental_mode).lower()
|
|
if hosts_filter:
|
|
params["hosts_filter"] = hosts_filter
|
|
if dcs_filter:
|
|
params["dcs_filter"] = dcs_filter
|
|
res = await self.client.post_json(f"/storage_service/tablets/repair", host=node_ip, timeout=timeout, params=params)
|
|
return res
|
|
|
|
async def enable_tablet_balancing(self, node_ip: str) -> None:
|
|
await self.client.post(f"/storage_service/tablets/balancing", host=node_ip, params={"enabled": "true"})
|
|
|
|
async def disable_tablet_balancing(self, node_ip: str) -> None:
|
|
await self.client.post(f"/storage_service/tablets/balancing", host=node_ip, params={"enabled": "false"})
|
|
|
|
async def keyspace_upgrade_sstables(self, node_ip: str, ks: str) -> None:
|
|
await self.client.get(f"/storage_service/keyspace_upgrade_sstables/{ks}", host=node_ip)
|
|
|
|
async def keyspace_scrub_sstables(self, node_ip: str, ks: str, scrub_mode: str) -> None:
|
|
await self.client.get(f"/storage_service/keyspace_scrub/{ks}", host=node_ip, params={"scrub_mode": scrub_mode})
|
|
|
|
async def disable_injection(self, node_ip: str, injection: str) -> None:
|
|
await self.client.delete(f"/v2/error_injection/injection/{quote(injection, safe='')}", host=node_ip)
|
|
|
|
async def get_enabled_injections(self, node_ip: str) -> list[str]:
|
|
data = await self.client.get_json("/v2/error_injection/injection", host=node_ip)
|
|
assert isinstance(data, list)
|
|
assert all(isinstance(e, str) for e in data)
|
|
return data
|
|
|
|
async def message_injection(self, node_ip: str, injection: str) -> None:
|
|
await self.client.post(f"/v2/error_injection/injection/{quote(injection, safe='')}/message", host=node_ip)
|
|
|
|
async def inject_disconnect(self, node_ip: str, ip_to_disconnect_from: str) -> None:
|
|
await self.client.post(f"/v2/error_injection/disconnect/{ip_to_disconnect_from}", host=node_ip)
|
|
|
|
async def get_logger_level(self, node_ip: str, logger: str) -> str:
|
|
"""Get logger level"""
|
|
return await self.client.get_text(f"/system/logger/{logger}", host=node_ip)
|
|
|
|
async def set_logger_level(self, node_ip: str, logger: str, level: str) -> None:
|
|
"""Set logger level"""
|
|
assert level in ["debug", "info", "warning", "trace"]
|
|
await self.client.post(f"/system/logger/{logger}?level={level}", host=node_ip)
|
|
|
|
async def flush_keyspace(self, node_ip: str, ks: str) -> None:
|
|
"""Flush keyspace"""
|
|
await self.client.post(f"/storage_service/keyspace_flush/{ks}", host=node_ip)
|
|
|
|
async def flush_all_keyspaces(self, node_ip: str) -> None:
|
|
"""Flush all keyspaces"""
|
|
await self.client.post(f"/storage_service/flush", host=node_ip)
|
|
|
|
async def backup(self, node_ip: str, ks: str, table: str, tag: str, dest: str, bucket: str, prefix: str, **kwargs) -> str:
|
|
"""Backup keyspace's snapshot"""
|
|
params = {"keyspace": ks,
|
|
"table": table,
|
|
"endpoint": dest,
|
|
"bucket": bucket,
|
|
"prefix": prefix,
|
|
"snapshot": tag}
|
|
# add optional args. for instance, "move_files".
|
|
for key, value in kwargs.items():
|
|
if isinstance(value, bool):
|
|
params[key] = 'true' if value else 'false'
|
|
else:
|
|
assert any(isinstance(value, t) for t in (str, int, float))
|
|
params[key] = value
|
|
return await self.client.post_json(f"/storage_service/backup", host=node_ip, params=params)
|
|
|
|
async def restore(self, node_ip: str, ks: str, cf: str, dest: str, bucket: str, prefix: str, sstables: list[str], scope: str = None, primary_replica_only=False) -> str:
|
|
"""Restore keyspace:table from backup"""
|
|
params = {"keyspace": ks,
|
|
"table": cf,
|
|
"endpoint": dest,
|
|
"bucket": bucket,
|
|
"prefix": prefix,
|
|
"primary_replica_only": "true" if primary_replica_only else "false"}
|
|
if scope is not None:
|
|
params['scope'] = scope
|
|
return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params, json=sstables)
|
|
|
|
async def take_snapshot(self, node_ip: str, ks: str, tag: str) -> None:
|
|
"""Take keyspace snapshot"""
|
|
params = { 'kn': ks, 'tag': tag }
|
|
await self.client.post(f"/storage_service/snapshots", host=node_ip, params=params)
|
|
|
|
async def cleanup_keyspace(self, node_ip: str, ks: str) -> None:
|
|
"""Cleanup keyspace"""
|
|
await self.client.post(f"/storage_service/keyspace_cleanup/{ks}", host=node_ip)
|
|
|
|
async def cleanup_all(self, node_ip: str):
|
|
await self.client.post("/storage_service/cleanup_all/", node_ip)
|
|
|
|
async def load_new_sstables(self, node_ip: str, keyspace: str, table: str, primary_replica : bool = False, scope: str = "all", load_and_stream : bool = False) -> None:
|
|
"""Load sstables from upload directory"""
|
|
params = {"cf": table,
|
|
"primary_replica_only": "true" if primary_replica else "false",
|
|
"scope": scope,
|
|
"load_and_stream": "true" if load_and_stream else "false"}
|
|
await self.client.post(f"/storage_service/sstables/{keyspace}", host=node_ip, params=params)
|
|
|
|
async def drop_sstable_caches(self, node_ip: str) -> None:
|
|
"""Drop sstable caches"""
|
|
await self.client.post(f"/system/drop_sstable_caches", host=node_ip)
|
|
|
|
async def keyspace_flush(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
|
|
"""Flush the specified or all tables in the keyspace"""
|
|
url = f"/storage_service/keyspace_flush/{keyspace}"
|
|
if table is not None:
|
|
url += f"?cf={table}"
|
|
await self.client.post(url, host=node_ip)
|
|
|
|
async def keyspace_compaction(self, node_ip: str, keyspace: str, table: Optional[str] = None, consider_only_existing_data: bool = False) -> None:
|
|
"""Compact the specified or all tables in the keyspace"""
|
|
url = f"/storage_service/keyspace_compaction/{keyspace}"
|
|
params = {
|
|
"consider_only_existing_data": str(consider_only_existing_data),
|
|
}
|
|
if table is not None:
|
|
params["cf"] = table
|
|
await self.client.post(url, host=node_ip, params=params)
|
|
|
|
async def stop_compaction(self, node_ip: str, type: str) -> None:
|
|
"""Stop compaction of a given type"""
|
|
url = f"/compaction_manager/stop_compaction?type={type}"
|
|
await self.client.post(url, host=node_ip)
|
|
|
|
async def dump_llvm_profile(self, node_ip : str):
|
|
"""Dump llvm profile to disk that can later be used for PGO or coverage reporting.
|
|
no-op if the scylla binary is not instrumented."""
|
|
url = "/system/dump_llvm_profile"
|
|
await self.client.post(url, host=node_ip)
|
|
|
|
async def upgrade_to_raft_topology(self, node_ip: str) -> None:
|
|
"""Start the upgrade to raft topology"""
|
|
await self.client.post("/storage_service/raft_topology/upgrade", host=node_ip)
|
|
|
|
async def raft_topology_upgrade_status(self, node_ip: str) -> str:
|
|
"""Returns the current state of upgrade to raft topology"""
|
|
data = await self.client.get_json("/storage_service/raft_topology/upgrade", host=node_ip)
|
|
assert isinstance(data, str)
|
|
return data
|
|
|
|
async def get_raft_leader(self, node_ip: str, group_id: Optional[str] = None) -> HostID:
|
|
"""Returns host ID of the current leader of the given raft group as seen by the registry on the contact node.
|
|
When group_id is not specified, group0 is used."""
|
|
params = {}
|
|
if group_id:
|
|
params["group_id"] = group_id
|
|
data = await self.client.get_json("/raft/leader_host", host=node_ip, params=params)
|
|
return HostID(data)
|
|
|
|
async def repair(self, node_ip: str, keyspace: str, table: str, ranges: str = '', small_table_optimization: bool = False) -> None:
|
|
"""Repair the given table and wait for it to complete"""
|
|
vnode_keyspaces = await self.client.get_json(f"/storage_service/keyspaces", host=node_ip, params={"replication": "vnodes"})
|
|
if keyspace in vnode_keyspaces:
|
|
if ranges:
|
|
params = {"columnFamilies": table, "ranges": ranges}
|
|
else:
|
|
params = {"columnFamilies": table}
|
|
if small_table_optimization:
|
|
params["small_table_optimization"] = "true"
|
|
sequence_number = await self.client.post_json(f"/storage_service/repair_async/{keyspace}", host=node_ip, params=params)
|
|
status = await self.client.get_json(f"/storage_service/repair_status", host=node_ip, params={"id": str(sequence_number)})
|
|
if status != 'SUCCESSFUL':
|
|
raise Exception(f"Repair id {sequence_number} on node {node_ip} for table {keyspace}.{table} failed: status={status}")
|
|
else:
|
|
if ranges:
|
|
raise ValueError(f"Ranges parameter is not supported for tablet keyspaces")
|
|
params={
|
|
"ks": keyspace,
|
|
"table": table,
|
|
"tokens": "all",
|
|
"await_completion": "true",
|
|
}
|
|
await self.client.post_json(f"/storage_service/tablets/repair", host=node_ip, params=params)
|
|
|
|
def __get_autocompaction_url(self, keyspace: str, table: Optional[str] = None) -> str:
|
|
"""Return autocompaction url for the given keyspace/table"""
|
|
return f"/storage_service/auto_compaction/{keyspace}" if not table else \
|
|
f"/column_family/autocompaction/{keyspace}:{table}"
|
|
|
|
async def enable_autocompaction(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
|
|
"""Enable autocompaction for the given keyspace/table"""
|
|
await self.client.post(self.__get_autocompaction_url(keyspace, table), host=node_ip)
|
|
|
|
async def disable_autocompaction(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
|
|
"""Disable autocompaction for the given keyspace/table"""
|
|
await self.client.delete(self.__get_autocompaction_url(keyspace, table), host=node_ip)
|
|
|
|
async def retrain_dict(self, node_ip: str, keyspace: str, table: str):
|
|
url = f"/storage_service/retrain_dict?keyspace={keyspace}&cf={table}"
|
|
await self.client.post_json(url, host=node_ip)
|
|
|
|
async def estimate_compression_ratios(self, node_ip: str, keyspace: str, table: str):
|
|
url = f"/storage_service/estimate_compression_ratios?keyspace={keyspace}&cf={table}"
|
|
return await self.client.get_json(url, host=node_ip)
|
|
|
|
async def get_sstable_info(self, node_ip: str, keyspace: Optional[str] = None, table: Optional[str] = None):
|
|
url = "/storage_service/sstable_info"
|
|
params = []
|
|
if keyspace:
|
|
params.append(f"keyspace={keyspace}")
|
|
if table:
|
|
params.append(f"cf={table}")
|
|
if params:
|
|
url += f"?{'&'.join(params)}"
|
|
|
|
data = await self.client.get_json(url, host=node_ip)
|
|
assert isinstance(data, list)
|
|
return data
|
|
|
|
async def get_task_status(self, node_ip: str, task_id: str):
|
|
return await self.client.get_json(f'/task_manager/task_status/{task_id}', host=node_ip)
|
|
|
|
async def wait_task(self, node_ip: str, task_id: str):
|
|
return await self.client.get_json(f'/task_manager/wait_task/{task_id}', host=node_ip)
|
|
|
|
async def abort_task(self, node_ip: str, task_id: str):
|
|
await self.client.post(f'/task_manager/abort_task/{task_id}', host=node_ip)
|
|
|
|
async def get_config(self, node_ip: str, id: str):
|
|
return await self.client.get_json(f'/v2/config/{id}', host=node_ip)
|
|
|
|
async def set_trace_probability(self, node_ip: str, probability: float) -> None:
|
|
await self.client.post(
|
|
resource_uri="/storage_service/trace_probability",
|
|
host=node_ip,
|
|
params={"probability": probability},
|
|
)
|
|
|
|
async def describe_ring(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> Any:
|
|
params = None
|
|
if (table):
|
|
params = {"table": table}
|
|
return await self.client.get_json(f'/storage_service/describe_ring/{keyspace}', host=node_ip, params=params)
|
|
|
|
async def range_to_endpoint_map(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> Any:
|
|
params = None
|
|
if (table):
|
|
params = {"cf": table}
|
|
return await self.client.get_json(f'/storage_service/range_to_endpoint_map/{keyspace}', host=node_ip, params=params)
|
|
|
|
async def natural_endpoints(self, node_ip: str, keyspace: str, table: str, key: str) -> Any:
|
|
params = {"cf": table, "key": key}
|
|
return await self.client.get_json(f'/storage_service/natural_endpoints/{keyspace}', host=node_ip, params=params)
|
|
|
|
async def reload_raft_topology_state(self, node_ip: str):
|
|
await self.client.post("/storage_service/raft_topology/reload", node_ip)
|
|
|
|
async def tokens_endpoint(self, node_ip: str, keyspace: Optional[str] = None, table: Optional[str] = None) -> Any:
|
|
params = {}
|
|
if keyspace:
|
|
params['keyspace'] = keyspace
|
|
if table:
|
|
params['cf'] = table
|
|
return await self.client.get_json('/storage_service/tokens_endpoint', host=node_ip, params=params)
|
|
|
|
|
|
class ScyllaMetricsLine:
|
|
def __init__(self, name: str, labels: dict, value: float):
|
|
self.name = name
|
|
self.labels = labels
|
|
self.value = value
|
|
|
|
@staticmethod
|
|
def from_string(line: str):
|
|
labels_start = line.find('{')
|
|
labels_finish = line.find('}')
|
|
if labels_start == -1 or labels_finish == -1:
|
|
return None
|
|
name = line[:labels_start].strip()
|
|
label_str = line[labels_start + 1:labels_finish]
|
|
if not label_str.strip():
|
|
labels = {}
|
|
else:
|
|
items = [kv.split('=') for kv in label_str.split(',') if kv]
|
|
labels = {k.strip(): v.strip().strip('"') for k, v in items}
|
|
value = float(line[labels_finish + 2:])
|
|
return ScyllaMetricsLine(name, labels, value)
|
|
|
|
class ScyllaMetrics:
|
|
def __init__(self, lines: list[str]):
|
|
self.lines: list[str] = lines
|
|
|
|
def lines_by_prefix(self, prefix: str):
|
|
"""Returns all metrics whose name starts with a prefix, e.g.
|
|
metrics.lines_by_prefix('scylla_hints_manager_')
|
|
"""
|
|
return [l for l in self.lines if l.startswith(prefix)]
|
|
|
|
def _labels_match(self, metric_labels: dict, filter_labels: dict):
|
|
return all(metric_labels.get(k) == str(v) for k, v in filter_labels.items())
|
|
|
|
def get(self, name: str, labels = {}):
|
|
"""Get the metric value by name, optionally filtering by labels.
|
|
|
|
The name parameter is used to filter metrics by prefix - all metrics whose
|
|
names start with the given string will be considered. The labels parameter
|
|
is a dictionary of key-value pairs used to further filter the metrics.
|
|
|
|
Example:
|
|
metrics.get('scylla_transport_cql_errors_total',
|
|
{'type': 'protocol_error', 'shard': '0'})
|
|
|
|
Returns the sum of all matching metric values, or None if no matches found.
|
|
"""
|
|
values = [
|
|
parsed_line.value
|
|
for l in self.lines_by_prefix(name)
|
|
if (parsed_line := ScyllaMetricsLine.from_string(l)) is not None
|
|
and self._labels_match(parsed_line.labels, labels)
|
|
]
|
|
return sum(values) if values else None
|
|
|
|
class ScyllaMetricsClient:
|
|
"""Async Scylla Metrics API client"""
|
|
|
|
def __init__(self, port: int = 9180):
|
|
self.client = TCPRESTClient(port)
|
|
|
|
async def query(self, server_ip: IPAddress) -> ScyllaMetrics:
|
|
data = await self.client.get_text('/metrics', host=server_ip)
|
|
return ScyllaMetrics(data.split('\n'))
|
|
|
|
|
|
class InjectionHandler():
|
|
"""An async client for communicating with injected code by REST API"""
|
|
def __init__(self, api: ScyllaRESTAPIClient, injection: str, node_ip: str):
|
|
self.api = api
|
|
self.injection = injection
|
|
self.node_ip = node_ip
|
|
|
|
async def message(self) -> None:
|
|
await self.api.message_injection(self.node_ip, self.injection)
|
|
|
|
@asynccontextmanager
|
|
async def inject_error(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str,
|
|
parameters: dict[str, Any] = {}) -> AsyncIterator[InjectionHandler]:
|
|
"""Attempts to inject an error. Works only in specific build modes: debug,dev,sanitize.
|
|
It will trigger a test to be skipped if attempting to enable an injection has no effect.
|
|
This is a context manager for enabling and disabling when done, therefore it can't be
|
|
used for one shot.
|
|
"""
|
|
await api.enable_injection(node_ip, injection, False, parameters)
|
|
enabled = await api.get_enabled_injections(node_ip)
|
|
logging.info(f"Error injections enabled on {node_ip}: {enabled}")
|
|
if not enabled:
|
|
pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")
|
|
try:
|
|
yield InjectionHandler(api, injection, node_ip)
|
|
finally:
|
|
logger.info(f"Disabling error injection {injection}")
|
|
await api.disable_injection(node_ip, injection)
|
|
|
|
|
|
async def inject_error_one_shot(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str, parameters: dict[str, Any] = {}) -> InjectionHandler:
|
|
"""Attempts to inject an error. Works only in specific build modes: debug,dev,sanitize.
|
|
It will trigger a test to be skipped if attempting to enable an injection has no effect.
|
|
This is a one-shot injection enable.
|
|
"""
|
|
await api.enable_injection(node_ip, injection, True, parameters)
|
|
enabled = await api.get_enabled_injections(node_ip)
|
|
logging.info(f"Error injections enabled on {node_ip}: {enabled}")
|
|
if not enabled:
|
|
pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")
|
|
return InjectionHandler(api, injection, node_ip)
|
|
|
|
|
|
async def read_barrier(api: ScyllaRESTAPIClient, node_ip: IPAddress, group_id: Optional[str] = None,
|
|
timeout: Optional[int] = None) -> None:
|
|
""" Issue a read barrier on the specific host for the group_id.
|
|
|
|
:param api: the REST API client
|
|
:param node_ip: the node IP address for which the read barrier will be posted
|
|
:param group_id: the optional group id (default=group0)
|
|
:param timeout: the optional timeout in seconds (for the Raft operation on the node)
|
|
"""
|
|
params = {}
|
|
if group_id:
|
|
params["group_id"] = group_id
|
|
if timeout:
|
|
params["timeout"] = str(timeout)
|
|
|
|
await api.client.post("/raft/read_barrier", host=node_ip, params=params)
|
|
|
|
|
|
def get_host_api_address(host: Host) -> IPAddress:
|
|
""" Returns the API address of the host.
|
|
|
|
The API address can be different than the RPC (node) address under certain circumstances.
|
|
In particular, in case the RPC address has been modified.
|
|
"""
|
|
return host.listen_address if host.listen_address else host.address
|