mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 17:40:34 +00:00
cluster.dtest_alternator_tests.test_slow_query_logging performs a bootstrap with 768 token ranges. It works with `me` sstables, which have 2 open file descriptors per open sstable, but with `ms` sstables, which have 3 open file descriptors per open sstable, it fails with EMFILE. To avoid this problem, let's just decrease the number of vnodes for in the test suite. It's appropriate anyway, because it avoids some unneeded work without weakening the tests. (Note: pylib-based have been setting `num_tokens` to 16 for a long time too). This breaks `bypass_cache_test`, which is written in a way that expects a certain number of token ranges. We adjust the relevant parameter accordingly.
897 lines
37 KiB
Python
897 lines
37 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
import glob
|
|
import os
|
|
import re
|
|
import time
|
|
import locale
|
|
import subprocess
|
|
from collections import namedtuple
|
|
from itertools import chain
|
|
from functools import cached_property
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
|
|
from test.pylib.internal_types import ServerUpState
|
|
from test.pylib.manager_client import NoSuchProcess
|
|
|
|
if TYPE_CHECKING:
|
|
from test.pylib.internal_types import ServerInfo
|
|
from test.pylib.log_browsing import ScyllaLogFile
|
|
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
|
|
|
|
|
|
NODETOOL_STDERR_IGNORED_PATTERNS = (
|
|
re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
|
|
re.compile(
|
|
r"==[0-9]+==WARNING: ASan doesn't fully support makecontext/swapcontext"
|
|
r" functions and may produce false positives in some cases!"
|
|
),
|
|
)
|
|
|
|
CASSANDRA_OPTIONS_MAPPING = {
|
|
"-Dcassandra.replace_address_first_boot": "--replace-address-first-boot",
|
|
}
|
|
|
|
DEFAULT_SMP = 2
|
|
DEFAULT_MEMORY_PER_CPU = 512 * 1024 * 1024 # bytes
|
|
DEFAULT_SCYLLA_LOG_LEVEL = "info"
|
|
|
|
KNOWN_LOG_LEVELS = {
|
|
"TRACE": "trace",
|
|
"DEBUG": "debug",
|
|
"INFO": "info",
|
|
"WARN": "warn",
|
|
"ERROR": "error",
|
|
"OFF": "info",
|
|
}
|
|
|
|
# Captures the aggregate metric before the "[READ ..., WRITE ...]" block.
|
|
STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*')
|
|
|
|
# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block.
|
|
STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*')
|
|
|
|
# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block.
|
|
STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*')
|
|
|
|
# Splits a "key : value" line into key and value.
|
|
STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$')
|
|
|
|
|
|
class NodeError(Exception):
|
|
def __init__(self, msg: str, process: int | None = None):
|
|
super().__init__(msg)
|
|
self.process = process
|
|
|
|
|
|
class ToolError(Exception):
|
|
def __init__(self, command: str | list[str], exit_status: int, stdout: Any = None, stderr: Any = None):
|
|
self.command = command
|
|
self.exit_status = exit_status
|
|
self.stdout = stdout
|
|
self.stderr = stderr
|
|
|
|
message = [f"Subprocess {command} exited with non-zero status; exit status: {exit_status}"]
|
|
if stdout:
|
|
message.append(f"stdout: {self._decode(stdout)}")
|
|
if stderr:
|
|
message.append(f"stderr: {self._decode(stderr)}")
|
|
|
|
Exception.__init__(self, "; \n".join(message))
|
|
|
|
@staticmethod
|
|
def _decode(value: str | bytes) -> str:
|
|
if isinstance(value, bytes):
|
|
return bytes.decode(value, locale.getpreferredencoding(do_setlocale=False))
|
|
return value
|
|
|
|
|
|
NodetoolError = ToolError
|
|
|
|
|
|
class ScyllaNode:
|
|
def __init__(self, cluster: ScyllaCluster, server: ServerInfo, name: str):
|
|
self.cluster = cluster
|
|
self.server_id = server.server_id
|
|
self.name = name
|
|
self.pid = None
|
|
self.all_pids = []
|
|
self.network_interfaces = {
|
|
"storage": (str(server.rpc_address), 7000),
|
|
"binary": (str(server.rpc_address), 9042),
|
|
}
|
|
self.data_center = server.datacenter
|
|
self.rack = server.rack
|
|
|
|
self._smp_set_during_test = None
|
|
self._smp = None
|
|
self._memory = None
|
|
|
|
self.__global_log_level = "info"
|
|
self.__classes_log_level = {}
|
|
|
|
self.bootstrap = True
|
|
|
|
def set_configuration_options(self,
|
|
values: dict | None = None,
|
|
batch_commitlog: bool | None = None) -> None:
|
|
"""Set DB node configuration options.
|
|
|
|
Example:
|
|
node.set_configuration_options(values={
|
|
'hinted_handoff_enabled' : True,
|
|
'concurrent_writes' : 64,
|
|
})
|
|
|
|
The batch_commitlog option gives an easier way to switch to batch
|
|
commitlog (since it requires setting 2 options and unsetting one).
|
|
"""
|
|
self.cluster.set_configuration_options(values=values, batch_commitlog=batch_commitlog, nodes=self)
|
|
|
|
def set_log_level(self, new_level: str, class_name: str | None = None) -> ScyllaNode:
|
|
if new_log_level := KNOWN_LOG_LEVELS.get(new_level):
|
|
if class_name is None:
|
|
self.__global_log_level = new_log_level
|
|
else:
|
|
self.__classes_log_level[class_name] = new_log_level
|
|
return self
|
|
raise ArgumentError(f"Unknown log level {new_level} (use one of {' '.join(KNOWN_LOG_LEVELS)})")
|
|
|
|
def scylla_mode(self) -> str:
|
|
return self.cluster.scylla_mode
|
|
|
|
def set_smp(self, smp: int) -> None:
|
|
self._smp_set_during_test = smp
|
|
|
|
def smp(self) -> int:
|
|
return self._smp_set_during_test or self._smp or DEFAULT_SMP
|
|
|
|
def memory(self) -> int:
|
|
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
|
|
|
|
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
|
|
if memory:
|
|
self._memory = memory // (smp or self.smp()) * self.smp()
|
|
if smp:
|
|
memory_per_cpu = self.memory() // self.smp()
|
|
self._smp = smp
|
|
self._memory = memory_per_cpu * self.smp()
|
|
|
|
def set_mem_mb_per_cpu(self, mem: int) -> None: # not used in scylla-dtest
|
|
raise NotImplementedError("setting memory per CPU during a test is not supported")
|
|
|
|
def address(self) -> str:
|
|
"""Return the IP use by this node for internal communication."""
|
|
|
|
return self.network_interfaces["storage"][0]
|
|
|
|
def is_running(self) -> bool:
|
|
try:
|
|
return self.cluster.manager.server_get_returncode(server_id=self.server_id) is None
|
|
except NoSuchProcess:
|
|
return False
|
|
|
|
is_live = is_running
|
|
|
|
@cached_property
|
|
def scylla_log_file(self) -> ScyllaLogFile:
|
|
return self.cluster.manager.server_open_log(server_id=self.server_id)
|
|
|
|
def grep_log(self,
|
|
expr: str,
|
|
filter_expr: str | None = None,
|
|
filename: str | None = None, # not used in scylla-dtest
|
|
from_mark: int | None = None) -> list[tuple[str, re.Match[str]]]:
|
|
assert filename is None, "only ScyllaDB's log is supported"
|
|
|
|
return self.scylla_log_file.grep(expr=expr, filter_expr=filter_expr, from_mark=from_mark)
|
|
|
|
def grep_log_for_errors(self,
|
|
filename: str | None = None, # not used in scylla-dtest
|
|
distinct_errors: bool = False,
|
|
search_str: str | None = None, # not used in scylla-dtest
|
|
case_sensitive: bool = True, # not used in scylla-dtest
|
|
from_mark: int | None = None) -> list[str] | list[list[str]]:
|
|
assert filename is None, "only ScyllaDB's log is supported"
|
|
assert search_str is None, "argument `search_str` is not supported"
|
|
assert case_sensitive, "only case sensitive search is supported"
|
|
|
|
from_mark = getattr(self, "error_mark", None) if from_mark is None else from_mark
|
|
|
|
return self.scylla_log_file.grep_for_errors(distinct_errors=distinct_errors, from_mark=from_mark)
|
|
|
|
def mark_log_for_errors(self, filename: str | None = None) -> None:
|
|
assert filename is None, "only ScyllaDB's log is supported"
|
|
|
|
self.error_mark = self.mark_log()
|
|
|
|
def mark_log(self, filename: str | None = None) -> int:
|
|
assert filename is None, "only ScyllaDB's log is supported"
|
|
|
|
return self.scylla_log_file.mark()
|
|
|
|
def watch_log_for(self,
|
|
exprs: str | list[str],
|
|
from_mark: int | None = None,
|
|
timeout: float = 600,
|
|
process: subprocess.Popen | None = None, # don't use it here
|
|
verbose: bool | None = None, # not used in scylla-dtest
|
|
filename: str | None = None, # not used in scylla-dtest
|
|
polling_interval: float | None = None) -> tuple[str, re.Match[str]] | list[tuple[str, re.Match[str]]]: # not used in scylla-dtest
|
|
assert process is None, "argument `process` is not supported"
|
|
assert verbose is None, "argument `verbose` is not supported"
|
|
assert filename is None, "only ScyllaDB's log is supported"
|
|
assert polling_interval is None, "argument `polling_interval` is not supported"
|
|
|
|
if isinstance(exprs, str):
|
|
exprs = [exprs]
|
|
|
|
_, matches = self.scylla_log_file.wait_for(*exprs, from_mark=from_mark, timeout=timeout)
|
|
|
|
return matches[0] if len(matches) == 1 else matches
|
|
|
|
def watch_log_for_death(self,
|
|
nodes: ScyllaNode | list[ScyllaNode],
|
|
from_mark: int | None = None,
|
|
timeout: float = 600,
|
|
filename: str | None = None) -> None:
|
|
"""Watch the log of this node until it detects that the provided other nodes are marked dead.
|
|
|
|
This method returns nothing but throw a TimeoutError if all the requested node have not been found
|
|
to be marked dead before timeout sec.
|
|
|
|
A mark as returned by mark_log() can be used as the `from_mark` parameter to start watching the log
|
|
from a given position. Otherwise, the log is watched from the beginning.
|
|
"""
|
|
assert filename is None, "only ScyllaDB's log is supported"
|
|
|
|
if not isinstance(nodes, list):
|
|
nodes = [nodes]
|
|
|
|
self.watch_log_for(
|
|
[f"({node.address()}|{node.hostid()}).* now (dead|DOWN)" for node in nodes],
|
|
from_mark=from_mark,
|
|
timeout=timeout,
|
|
)
|
|
|
|
def watch_log_for_alive(self,
|
|
nodes: ScyllaNode | list[ScyllaNode],
|
|
from_mark: int | None = None,
|
|
timeout: float = 120,
|
|
filename: str | None = None) -> None:
|
|
"""Watch the log of this node until it detects that the provided other nodes are marked UP.
|
|
|
|
This method works similarly to watch_log_for_death().
|
|
"""
|
|
assert filename is None, "only ScyllaDB's log is supported"
|
|
|
|
if not isinstance(nodes, list):
|
|
nodes = [nodes]
|
|
|
|
self.watch_log_for(
|
|
[f"({node.address()}|{node.hostid()}).* now UP" for node in nodes],
|
|
from_mark=from_mark,
|
|
timeout=timeout,
|
|
)
|
|
|
|
def watch_rest_for_alive(self,
|
|
nodes: ScyllaNode | list[ScyllaNode],
|
|
timeout: float = 120,
|
|
wait_normal_token_owner: bool = True) -> None:
|
|
nodes = nodes if isinstance(nodes, list) else [nodes]
|
|
tofind_host_id_map = {node.address(): node.hostid() for node in nodes}
|
|
tofind = {node.address() for node in nodes}
|
|
node_ip = self.address()
|
|
|
|
found = set()
|
|
found_host_id_map = {}
|
|
|
|
api = self.cluster.manager.api
|
|
|
|
deadline = time.perf_counter() + timeout
|
|
while time.perf_counter() < deadline:
|
|
if tofind <= set(api.get_alive_endpoints(node_ip=node_ip)) - set(api.get_joining_nodes(node_ip=node_ip)):
|
|
if not any(node for node in tofind if not api.get_tokens(node_ip=node_ip, endpoint=node)):
|
|
if not wait_normal_token_owner:
|
|
return
|
|
|
|
# Verify other nodes are considered normal token owners on this node and their host_ids
|
|
# match the host_ids the client knows about.
|
|
host_id_map = {x["key"]: x["value"] for x in api.get_host_id_map(dst_server_ip=node_ip)}
|
|
found_host_id_map.update(host_id_map)
|
|
for addr, host_id in host_id_map.items():
|
|
if addr not in tofind_host_id_map:
|
|
continue
|
|
if host_id == tofind_host_id_map[addr] or not tofind_host_id_map[addr]:
|
|
tofind.discard(addr)
|
|
found.add(addr)
|
|
|
|
if not tofind:
|
|
return
|
|
time.sleep(0.1)
|
|
|
|
self.debug(f"watch_rest_for_alive: {tofind=} {found=}: {tofind_host_id_map=} {found_host_id_map=}")
|
|
raise TimeoutError(f"watch_rest_for_alive() timeout after {timeout} seconds")
|
|
|
|
def wait_for_binary_interface(self, from_mark: int | None = None, timeout: float | None = None) -> None:
|
|
"""Waits for the binary CQL interface to be listening."""
|
|
|
|
if timeout is None:
|
|
timeout = self.cluster.default_wait_for_binary_proto
|
|
self.watch_log_for(exprs="Starting listening for CQL clients", from_mark=from_mark, timeout=timeout)
|
|
|
|
def wait_until_stopped(self,
|
|
wait_seconds: int | None = None,
|
|
marks: list[tuple[ScyllaNode, int]] | None = None,
|
|
dump_core: bool = True) -> None: # not implemented
|
|
if wait_seconds is None:
|
|
wait_seconds = 127 if self.scylla_mode() != "debug" else 600
|
|
if not wait_for(func=lambda: not self.is_running(), timeout=wait_seconds):
|
|
raise NodeError(f"Problem stopping node {self.name}")
|
|
|
|
for node, mark in marks or []:
|
|
if node.server_id != self.server_id:
|
|
node.watch_log_for_death(nodes=self, from_mark=mark)
|
|
|
|
def _process_scylla_args(self, *args: str) -> list[str]:
|
|
# Parse default overrides in SCYLLA_EXT_OPTS
|
|
scylla_args = _parse_scylla_args(os.environ.get("SCYLLA_EXT_OPTS", "").split())
|
|
|
|
if smp := scylla_args.pop("--smp", None):
|
|
smp = int(smp[0])
|
|
if memory := scylla_args.pop("--memory", None):
|
|
memory = _parse_size(memory[0])
|
|
self._adjust_smp_and_memory(smp=smp, memory=memory)
|
|
|
|
if args:
|
|
parsed_args = []
|
|
for arg in args:
|
|
option, *value = arg.split("=")
|
|
if not value or option not in CASSANDRA_OPTIONS_MAPPING:
|
|
parsed_args.append(arg)
|
|
elif len(value) == 1:
|
|
scylla_args[CASSANDRA_OPTIONS_MAPPING[option]] = value
|
|
else:
|
|
raise RuntimeError(f"Option {arg} not in form '-Dcassandra.foo=bar'. Please check your test")
|
|
scylla_args.update(_parse_scylla_args(parsed_args))
|
|
if smp := scylla_args.pop("--smp", None):
|
|
self._adjust_smp_and_memory(smp=int(smp[0]))
|
|
if memory := scylla_args.pop("--memory", None):
|
|
self._memory = _parse_size(memory[0])
|
|
|
|
default_scylla_args = {
|
|
"--smp": [str(self.smp())],
|
|
"--memory": [f"{self.memory() // 1024 ** 2}M"],
|
|
"--developer-mode": ["true"],
|
|
"--default-log-level": [self.__global_log_level],
|
|
"--kernel-page-cache": ["1"],
|
|
"--commitlog-use-o-dsync": ["0"],
|
|
"--max-networking-io-control-blocks": ["1000"],
|
|
"--unsafe-bypass-fsync": ["1"],
|
|
"--num-tokens": ["16"],
|
|
}
|
|
|
|
if self.scylla_mode() == "debug":
|
|
default_scylla_args["--blocked-reactor-notify-ms"] = ["5000"]
|
|
|
|
scylla_args = default_scylla_args | scylla_args
|
|
|
|
if "--cpuset" not in scylla_args:
|
|
scylla_args["--overprovisioned"] = [""]
|
|
|
|
return list(chain.from_iterable(
|
|
(arg, value) if values and all(values) else (arg, )
|
|
for arg, values in scylla_args.items()
|
|
for value in values
|
|
))
|
|
|
|
@staticmethod
|
|
def _process_scylla_env() -> dict[str, str]:
|
|
scylla_env = {}
|
|
|
|
for var in os.environ.get("SCYLLA_EXT_ENV", "").replace(";" , " ").split():
|
|
k, v = var.split(sep="=", maxsplit=1)
|
|
if not v:
|
|
raise RuntimeError(f"SCYLLA_EXT_ENV: unable to parse {var!r} as an env variable")
|
|
scylla_env[k] = v
|
|
|
|
return scylla_env
|
|
|
|
def start(self,
|
|
join_ring: bool | None = None, # not used in scylla-dtest
|
|
no_wait: bool = False,
|
|
verbose: bool | None = None, # not used in scylla-dtest
|
|
update_pid: bool = True, # not used here
|
|
wait_other_notice: bool | None = None,
|
|
wait_normal_token_owner: bool | None = None,
|
|
replace_token: str | None = None, # not used in scylla-dtest
|
|
replace_address: str | None = None,
|
|
replace_node_host_id: str | None = None,
|
|
jvm_args: list[str] | None = None,
|
|
wait_for_binary_proto: bool | None = None,
|
|
profile_options: dict[str, str] | None = None, # not used in scylla-dtest
|
|
use_jna: bool | None = None, # not used in scylla-dtest
|
|
quiet_start: bool | None = None) -> None: # not used in scylla-dtest
|
|
assert join_ring is None, "argument `join_ring` is not supported"
|
|
assert verbose is None, "argument `verbose` is not supported"
|
|
assert replace_token is None, "argument `replace_token` is not supported"
|
|
assert profile_options is None, "argument `profile_options` is not supported"
|
|
assert use_jna is None, "argument `use_jna` is not supported"
|
|
assert quiet_start is None, "argument `quiet_start` is not supported"
|
|
|
|
assert replace_address is None or replace_node_host_id is None, \
|
|
"replace_address and replace_node_host_id cannot be specified together"
|
|
|
|
if self.is_running():
|
|
raise NodeError(f"{self.name} is already running")
|
|
|
|
scylla_args = self._process_scylla_args(
|
|
*(jvm_args or []),
|
|
*(["--replace-address", replace_address] if replace_address else []),
|
|
*(["--replace-node-first-boot", replace_node_host_id] if replace_node_host_id else []),
|
|
)
|
|
scylla_env = self._process_scylla_env()
|
|
|
|
marks = []
|
|
if wait_other_notice:
|
|
marks = [(node, node.mark_log()) for node in self.cluster.nodelist() if node.is_live()]
|
|
|
|
self.mark = self.mark_log()
|
|
|
|
self.cluster.manager.server_start(
|
|
server_id=self.server_id,
|
|
seeds=None if self.bootstrap else [self.address()],
|
|
expected_server_up_state=ServerUpState.PROCESS_STARTED,
|
|
cmdline_options_override=scylla_args,
|
|
append_env_override=scylla_env,
|
|
connect_driver=False,
|
|
)
|
|
|
|
if wait_for_binary_proto is None:
|
|
wait_for_binary_proto = self.cluster.force_wait_for_cluster_start and not no_wait
|
|
if wait_other_notice is None:
|
|
wait_other_notice = self.cluster.force_wait_for_cluster_start and not no_wait
|
|
if wait_normal_token_owner is None and wait_other_notice:
|
|
wait_normal_token_owner = True
|
|
|
|
if wait_for_binary_proto:
|
|
self.wait_for_binary_interface(from_mark=self.mark)
|
|
|
|
if wait_other_notice:
|
|
timeout = self.cluster.default_wait_other_notice_timeout
|
|
for node, mark in marks:
|
|
node.watch_log_for_alive(nodes=self, from_mark=mark, timeout=timeout)
|
|
node.watch_rest_for_alive(nodes=self, timeout=timeout, wait_normal_token_owner=wait_normal_token_owner)
|
|
self.watch_rest_for_alive(nodes=node, timeout=timeout, wait_normal_token_owner=wait_normal_token_owner)
|
|
|
|
def stop(self,
|
|
wait: bool = True,
|
|
wait_other_notice: bool = False,
|
|
other_nodes: list[ScyllaNode] | None = None,
|
|
gently: bool = True,
|
|
wait_seconds: int = 127,
|
|
marks: list[int] | None = None) -> bool:
|
|
if not self.is_running():
|
|
return False
|
|
|
|
if marks is None:
|
|
marks = [
|
|
(node, node.mark_log())
|
|
for node in other_nodes or self.cluster.nodelist()
|
|
if node.server_id != self.server_id and node.is_live()
|
|
] if wait_other_notice else []
|
|
|
|
if gently:
|
|
self.cluster.manager.server_stop_gracefully(server_id=self.server_id)
|
|
else:
|
|
self.cluster.manager.server_stop(server_id=self.server_id)
|
|
|
|
if wait or wait_other_notice:
|
|
self.wait_until_stopped(wait_seconds=wait_seconds, marks=marks, dump_core=gently)
|
|
|
|
return True
|
|
|
|
def nodetool(self,
|
|
cmd: str,
|
|
capture_output: bool = True,
|
|
wait: bool = True,
|
|
timeout: int | float | None = None,
|
|
verbose: bool = True) -> tuple[str, str]:
|
|
if capture_output and not wait:
|
|
raise ArgumentError("Cannot set capture_output while wait is False.")
|
|
|
|
nodetool_cmd = [
|
|
self.cluster.manager.server_get_exe(server_id=self.server_id),
|
|
"nodetool",
|
|
"-h",
|
|
str(self.cluster.manager.get_host_ip(server_id=self.server_id)),
|
|
*cmd.split(),
|
|
]
|
|
|
|
if verbose:
|
|
self.debug(f"nodetool cmd={nodetool_cmd} wait={wait} timeout={timeout}")
|
|
|
|
if capture_output:
|
|
p = subprocess.Popen(nodetool_cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = p.communicate(timeout=timeout)
|
|
else:
|
|
p = subprocess.Popen(nodetool_cmd, universal_newlines=True)
|
|
stdout, stderr = None, None
|
|
|
|
if wait and p.wait(timeout=timeout):
|
|
raise NodetoolError(" ".join(nodetool_cmd), p.returncode, stdout, stderr)
|
|
|
|
stderr = "\n".join(
|
|
line for line in stderr.splitlines()
|
|
if self.debug(f"checking {line}") or not any(p.fullmatch(line) for p in NODETOOL_STDERR_IGNORED_PATTERNS)
|
|
)
|
|
|
|
return stdout, stderr
|
|
|
|
def get_path(self) -> str:
|
|
"""Return the path to this node top level directory (where config/data is stored.)"""
|
|
|
|
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
|
|
|
|
def stress(self, stress_options: list[str], **kwargs):
|
|
"""
|
|
Run `cassandra-stress` against this node.
|
|
This method does not do any result parsing.
|
|
|
|
:param stress_options: List of options to pass to `cassandra-stress`.
|
|
:param kwargs: Additional arguments to pass to `subprocess.Popen()`.
|
|
:return: Named tuple with `stdout`, `stderr`, and `rc` (return code).
|
|
"""
|
|
|
|
cmd_args = ["cassandra-stress"] + stress_options
|
|
|
|
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
|
|
cmd_args.extend(["-node", self.address()])
|
|
|
|
p = subprocess.Popen(
|
|
cmd_args,
|
|
stdout=kwargs.pop("stdout", subprocess.PIPE),
|
|
stderr=kwargs.pop("stderr", subprocess.PIPE),
|
|
universal_newlines=True,
|
|
**kwargs,
|
|
)
|
|
try:
|
|
stdout, stderr = p.communicate()
|
|
if rc := p.returncode:
|
|
raise ToolError(cmd_args, rc, stdout, stderr)
|
|
ret = namedtuple("Subprocess_Return", "stdout stderr rc")
|
|
return ret(stdout=stdout, stderr=stderr, rc=rc)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
def _set_stress_val(self, key, val, res):
|
|
"""
|
|
Normalize a stress result string and populate aggregate/read/write metrics.
|
|
|
|
Removes comma-thousands separators from numbers, converts to float,
|
|
stores the aggregate metric under `key`.
|
|
If the value contains a "[READ ..., WRITE ...]" block, also stores the
|
|
read and write metrics under `key:read` and `key:write`.
|
|
|
|
:param key: The metric name
|
|
:param val: The metric value string
|
|
:param res: The dictionary to populate
|
|
"""
|
|
|
|
def parse_num(s):
|
|
return float(s.replace(',', ''))
|
|
|
|
if "[" in val:
|
|
p = STRESS_SUMMARY_PATTERN
|
|
m = p.match(val)
|
|
if m:
|
|
res[key] = parse_num(m.group(1))
|
|
p = STRESS_READ_PATTERN
|
|
m = p.match(val)
|
|
if m:
|
|
res[key + ":read"] = parse_num(m.group(1))
|
|
p = STRESS_WRITE_PATTERN
|
|
m = p.match(val)
|
|
if m:
|
|
res[key + ":write"] = parse_num(m.group(1))
|
|
else:
|
|
try:
|
|
res[key] = parse_num(val)
|
|
except ValueError:
|
|
res[key] = val
|
|
|
|
|
|
def stress_object(self, stress_options=None, ignore_errors=None, **kwargs):
|
|
"""
|
|
Run stress test and return results as a structured metrics dictionary.
|
|
|
|
Runs `stress()`, finds the `Results:` section in `stdout`, and then
|
|
processes each `key : value` line, putting it into a dictionary.
|
|
|
|
:param stress_options: List of stress options to pass to `stress()`.
|
|
:param ignore_errors: Deprecated (no effect).
|
|
:param kwargs: Additional arguments to pass to `stress()`.
|
|
:return: Dictionary of stress test results.
|
|
"""
|
|
if ignore_errors:
|
|
self.warning("passing `ignore_errors` to stress_object() is deprecated")
|
|
ret = self.stress(stress_options, **kwargs)
|
|
p = STRESS_KEY_VALUE_PATTERN
|
|
res = {}
|
|
start = False
|
|
for line in (s.strip() for s in ret.stdout.splitlines()):
|
|
if start:
|
|
m = p.match(line)
|
|
if m:
|
|
self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res)
|
|
else:
|
|
if line == 'Results:':
|
|
start = True
|
|
return res
|
|
|
|
|
|
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
|
|
cmd = ["flush"]
|
|
if ks:
|
|
cmd.append(ks)
|
|
if table:
|
|
cmd.append(table)
|
|
self.nodetool(" ".join(cmd), **kwargs)
|
|
|
|
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
|
|
compact_cmd = ["compact"]
|
|
if keyspace:
|
|
compact_cmd.append(keyspace)
|
|
compact_cmd += tables
|
|
self.nodetool(" ".join(compact_cmd))
|
|
|
|
def drain(self, block_on_log: bool = False) -> None:
|
|
mark = self.mark_log()
|
|
self.nodetool("drain")
|
|
if block_on_log:
|
|
self.watch_log_for("DRAINED", from_mark=mark)
|
|
|
|
def repair(self, options: list[str] | None = None, **kwargs) -> tuple[str, str]:
|
|
cmd = ["repair"]
|
|
if options:
|
|
cmd.extend(options)
|
|
return self.nodetool(" ".join(cmd), **kwargs)
|
|
|
|
def decommission(self) -> None:
|
|
self.cluster.manager.decommission_node(server_id=self.server_id)
|
|
|
|
def get_sstables(self, keyspace, column_family, ignore_unsealed=True, cleanup_unsealed=False):
|
|
keyspace_dir = os.path.join(self.get_path(), 'data', keyspace)
|
|
cf_glob = '*'
|
|
if column_family:
|
|
cf_glob = column_family + '-*'
|
|
if not os.path.exists(keyspace_dir):
|
|
raise ArgumentError(f"Unknown keyspace {keyspace}")
|
|
|
|
files = glob.glob(os.path.join(keyspace_dir, cf_glob, "*big-Data.db"))
|
|
for f in files:
|
|
if os.path.exists(f.replace('Data.db', 'Compacted')):
|
|
files.remove(f)
|
|
if ignore_unsealed or cleanup_unsealed:
|
|
# sstablelevelreset tries to open all sstables under the configured
|
|
# `data_file_directories` in cassandra.yaml. if any of them does not
|
|
# have missing component, it complains in its stderr. and some tests
|
|
# using this tool checks the stderr for unexpected error message, so
|
|
# we need to remove all unsealed sstable files in this case.
|
|
for toc_tmp in glob.glob(os.path.join(keyspace_dir, cf_glob, '*TOC.txt.tmp')):
|
|
if cleanup_unsealed:
|
|
self.info(f"get_sstables: Cleaning up unsealed SSTable: {toc_tmp}")
|
|
for unsealed in glob.glob(toc_tmp.replace('TOC.txt.tmp', '*')):
|
|
os.remove(unsealed)
|
|
else:
|
|
self.info(f"get_sstables: Ignoring unsealed SSTable: {toc_tmp}")
|
|
data_sst = toc_tmp.replace('TOC.txt.tmp', 'Data.db')
|
|
try:
|
|
files.remove(data_sst)
|
|
except ValueError:
|
|
pass
|
|
return files
|
|
|
|
def __gather_sstables(self, datafiles=None, keyspace=None, columnfamilies=None):
|
|
files = []
|
|
if keyspace is None:
|
|
for k in self.list_keyspaces():
|
|
files = files + self.get_sstables(k, "")
|
|
elif datafiles is None:
|
|
if columnfamilies is None:
|
|
files = files + self.get_sstables(keyspace, "")
|
|
else:
|
|
for cf in columnfamilies:
|
|
files = files + self.get_sstables(keyspace, cf)
|
|
else:
|
|
if not columnfamilies or len(columnfamilies) > 1:
|
|
raise ArgumentError("Exactly one column family must be specified with datafiles")
|
|
|
|
cf_dir = os.path.join(os.path.realpath(self.get_path()), 'data', keyspace, columnfamilies[0])
|
|
|
|
sstables = set()
|
|
for datafile in datafiles:
|
|
if not os.path.isabs(datafile):
|
|
datafile = os.path.join(os.getcwd(), datafile)
|
|
|
|
if not datafile.startswith(cf_dir + '-') and not datafile.startswith(cf_dir + os.sep):
|
|
raise NodeError("File doesn't appear to belong to the specified keyspace and column family: " + datafile)
|
|
|
|
sstable = _sstable_regexp.match(os.path.basename(datafile))
|
|
if not sstable:
|
|
raise NodeError("File doesn't seem to be a valid sstable filename: " + datafile)
|
|
|
|
sstable = sstable.groupdict()
|
|
if not sstable['tmp'] and sstable['identifier'] not in sstables:
|
|
if not os.path.exists(datafile):
|
|
raise IOError("File doesn't exist: " + datafile)
|
|
sstables.add(sstable['identifier'])
|
|
files.append(datafile)
|
|
|
|
return files
|
|
|
|
def run_scylla_sstable(self, command, additional_args=None, keyspace=None, datafiles=None, column_families=None, batch=False, text=True, env=None):
|
|
"""Invoke scylla-sstable, with the specified command (operation) and additional_args.
|
|
|
|
For more information about scylla-sstable, see https://docs.scylladb.com/stable/operating-scylla/admin-tools/scylla-sstable.html.
|
|
|
|
Params:
|
|
* command - The scylla-sstable command (operation) to run.
|
|
* additional_args - Additional command-line arguments to pass to scylla-sstable, this should be a list of strings.
|
|
* keyspace - Restrict the operation to sstables of this keyspace.
|
|
* datafiles - Restrict the operation to the specified sstables (Data components).
|
|
* column_families - Restrict the operation to sstables of these column_families. Must contain exactly one column family when datafiles is used.
|
|
* batch - If True, all sstables will be passed in a single batch. If False, sstables will be passed one at a time.
|
|
Batch-mode can be only used if column_families contains a single item.
|
|
* text - If True, output of the command is treated as text, if not as bytes
|
|
|
|
If datafiles is provided, the caller is responsible for making sure these
|
|
files are not removed by ScyllaDB while the tool is running.
|
|
If datafiles = None, this function will create a snapshot and dump
|
|
sstables from the snapshot to ensure that the sstables are not removed
|
|
while the tools is running.
|
|
The snapshot is removed after the dump completed, but it is left there in
|
|
case of error, for post-mortem analysis.
|
|
|
|
Returns: map: {sstable: (stdout, stderr)} of all invokations. When batch == True, a single entry will be present, with empty key.
|
|
|
|
Raises: subprocess.CalledProcessError if scylla-sstable returns a non-zero exit code.
|
|
"""
|
|
if additional_args is None:
|
|
additional_args = []
|
|
|
|
scylla_path = self.cluster.manager.server_get_exe(server_id=self.server_id)
|
|
ret = {}
|
|
|
|
if datafiles is None and keyspace is not None and self.is_running():
|
|
tag = "sstable-dump-{}".format(uuid.uuid1())
|
|
kts = ",".join(f"{keyspace}.{column_family}" for column_family in column_families)
|
|
self.debug(f"run_scylla_sstable(): creating snapshot with tag {tag} to be used for sstable dumping")
|
|
self.nodetool(f"snapshot -t {tag} {kts}")
|
|
sstables = []
|
|
for column_family in column_families:
|
|
sstables.extend(glob.glob(os.path.join(self.get_path(), 'data', keyspace, f"{column_family}-*/snapshots/{tag}/*-Data.db")))
|
|
else:
|
|
sstables = self.__gather_sstables(datafiles, keyspace, column_families)
|
|
tag = None
|
|
|
|
self.debug(f"run_scylla_sstable(): preparing to dump sstables {sstables}")
|
|
|
|
def do_invoke(sstables):
|
|
# there are chances that the table is not replicated on this node,
|
|
# in that case, scylla tool will fail to dump the sstables and error
|
|
# out. let's just return an empty list for the partitions, so that
|
|
# dump_sstables() can still parse it in the same way.
|
|
if not sstables:
|
|
empty_dump = {'sstables': {'anonymous': []}}
|
|
stdout, stderr = json.dumps(empty_dump), ''
|
|
if text:
|
|
return stdout, stderr
|
|
else:
|
|
return stdout.encode('utf-8'), stderr.encode('utf-8')
|
|
common_args = [scylla_path, "sstable", command] + additional_args
|
|
res = subprocess.run(common_args + sstables, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=text, check=False, env=env)
|
|
if res.returncode:
|
|
raise ToolError(command=' '.join(common_args + sstables), exit_status=res.returncode, stdout=res.stdout, stderr=res.stderr)
|
|
return (res.stdout, res.stderr)
|
|
|
|
if batch:
|
|
if column_families is None or len(column_families) > 1:
|
|
raise NodeError("run_scylla_sstable(): batch mode can only be used in conjunction with a single column_family")
|
|
ret[""] = do_invoke(sstables)
|
|
else:
|
|
for sst in sstables:
|
|
ret[sst] = do_invoke([sst])
|
|
|
|
# Deliberately not putting this in a `finally` block, if the command
|
|
# above failed, leave the snapshot with the sstables around for
|
|
# post-mortem analysis.
|
|
if tag is not None:
|
|
self.nodetool(f"clearsnapshot -t {tag} {keyspace}")
|
|
|
|
return ret
|
|
|
|
def hostid(self, timeout: float | None = None, force_refresh: bool | None = None) -> str | None:
|
|
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
|
|
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
|
|
|
|
try:
|
|
return self.cluster.manager.get_host_id(server_id=self.server_id)
|
|
except Exception as exc:
|
|
self.error(f"Failed to get hostid: {exc}")
|
|
|
|
def rmtree(self, path: str | Path) -> None:
|
|
"""Delete a directory content without removing the directory.
|
|
|
|
Copied this code from Python's documentation for Path.walk() method.
|
|
"""
|
|
for root, dirs, files in Path(path).walk(top_down=False):
|
|
for name in files:
|
|
(root / name).unlink()
|
|
for name in dirs:
|
|
(root / name).rmdir()
|
|
|
|
def _log_message(self, message: str) -> str:
|
|
return f"{self.name}: {message}"
|
|
|
|
def debug(self, message: str) -> None:
|
|
self.cluster.debug(self._log_message(message))
|
|
|
|
def info(self, message: str) -> None:
|
|
self.cluster.info(self._log_message(message))
|
|
|
|
def warning(self, message: str) -> None:
|
|
self.cluster.warning(self._log_message(message))
|
|
|
|
def error(self, message: str) -> None:
|
|
self.cluster.error(self._log_message(message))
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<ScyllaNode name={self.server_id} dc={self.data_center} rack={self.rack}>"
|
|
|
|
|
|
def _parse_scylla_args(args: list[str]) -> dict[str, list[str]]:
|
|
parsed_args = {}
|
|
|
|
args = iter(args)
|
|
arg = next(args, None)
|
|
|
|
while arg is not None:
|
|
assert arg.startswith("-")
|
|
|
|
key, *value = arg.split(sep="=", maxsplit=1)
|
|
|
|
if value: # handle argument in `--foo=bar` form
|
|
arg = next(args, None)
|
|
else:
|
|
for arg in args: # handle argument in `--foo bar` form
|
|
if arg.startswith("-"):
|
|
break
|
|
value.append(arg)
|
|
else: # no more arguments
|
|
arg = None
|
|
|
|
if key.startswith("--scylla-manager"): # skip Scylla Manager arguments
|
|
continue
|
|
|
|
parsed_args.setdefault(key, []).append(" ".join(value))
|
|
|
|
return parsed_args
|
|
|
|
|
|
def _parse_size(s: str) -> int:
|
|
try:
|
|
factor = 1024 ** abs("k KMGT".index(s[-1]) - 1)
|
|
except ValueError:
|
|
return int(s)
|
|
return int(s[:-1]) * factor
|