Files
scylladb/test/cluster/dtest/ccmlib/scylla_node.py
Michał Chojnowski 9280a039ee test/cluster/dtest: reduce num_tokens to 16
cluster.dtest_alternator_tests.test_slow_query_logging performs
a bootstrap with 768 token ranges.

It works with `me` sstables, which have 2 open file descriptors
per open sstable, but with `ms` sstables, which have 3 open
file descriptors per open sstable, it fails with EMFILE.

To avoid this problem, let's just decrease the number of vnodes
for in the test suite. It's appropriate anyway, because it avoids some
unneeded work without weakening the tests.
(Note: pylib-based have been setting `num_tokens` to 16 for a long time too).

This breaks `bypass_cache_test`, which is written in a way that expects
a certain number of token ranges. We adjust the relevant parameter
accordingly.
2026-03-09 17:12:09 +01:00

897 lines
37 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import glob
import os
import re
import time
import locale
import subprocess
from collections import namedtuple
from itertools import chain
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
from test.pylib.internal_types import ServerUpState
from test.pylib.manager_client import NoSuchProcess
if TYPE_CHECKING:
from test.pylib.internal_types import ServerInfo
from test.pylib.log_browsing import ScyllaLogFile
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
NODETOOL_STDERR_IGNORED_PATTERNS = (
re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
re.compile(
r"==[0-9]+==WARNING: ASan doesn't fully support makecontext/swapcontext"
r" functions and may produce false positives in some cases!"
),
)
CASSANDRA_OPTIONS_MAPPING = {
"-Dcassandra.replace_address_first_boot": "--replace-address-first-boot",
}
DEFAULT_SMP = 2
DEFAULT_MEMORY_PER_CPU = 512 * 1024 * 1024 # bytes
DEFAULT_SCYLLA_LOG_LEVEL = "info"
KNOWN_LOG_LEVELS = {
"TRACE": "trace",
"DEBUG": "debug",
"INFO": "info",
"WARN": "warn",
"ERROR": "error",
"OFF": "info",
}
# Captures the aggregate metric before the "[READ ..., WRITE ...]" block.
STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*')
# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block.
STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*')
# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block.
STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*')
# Splits a "key : value" line into key and value.
STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$')
class NodeError(Exception):
def __init__(self, msg: str, process: int | None = None):
super().__init__(msg)
self.process = process
class ToolError(Exception):
def __init__(self, command: str | list[str], exit_status: int, stdout: Any = None, stderr: Any = None):
self.command = command
self.exit_status = exit_status
self.stdout = stdout
self.stderr = stderr
message = [f"Subprocess {command} exited with non-zero status; exit status: {exit_status}"]
if stdout:
message.append(f"stdout: {self._decode(stdout)}")
if stderr:
message.append(f"stderr: {self._decode(stderr)}")
Exception.__init__(self, "; \n".join(message))
@staticmethod
def _decode(value: str | bytes) -> str:
if isinstance(value, bytes):
return bytes.decode(value, locale.getpreferredencoding(do_setlocale=False))
return value
NodetoolError = ToolError
class ScyllaNode:
def __init__(self, cluster: ScyllaCluster, server: ServerInfo, name: str):
self.cluster = cluster
self.server_id = server.server_id
self.name = name
self.pid = None
self.all_pids = []
self.network_interfaces = {
"storage": (str(server.rpc_address), 7000),
"binary": (str(server.rpc_address), 9042),
}
self.data_center = server.datacenter
self.rack = server.rack
self._smp_set_during_test = None
self._smp = None
self._memory = None
self.__global_log_level = "info"
self.__classes_log_level = {}
self.bootstrap = True
def set_configuration_options(self,
values: dict | None = None,
batch_commitlog: bool | None = None) -> None:
"""Set DB node configuration options.
Example:
node.set_configuration_options(values={
'hinted_handoff_enabled' : True,
'concurrent_writes' : 64,
})
The batch_commitlog option gives an easier way to switch to batch
commitlog (since it requires setting 2 options and unsetting one).
"""
self.cluster.set_configuration_options(values=values, batch_commitlog=batch_commitlog, nodes=self)
def set_log_level(self, new_level: str, class_name: str | None = None) -> ScyllaNode:
if new_log_level := KNOWN_LOG_LEVELS.get(new_level):
if class_name is None:
self.__global_log_level = new_log_level
else:
self.__classes_log_level[class_name] = new_log_level
return self
raise ArgumentError(f"Unknown log level {new_level} (use one of {' '.join(KNOWN_LOG_LEVELS)})")
def scylla_mode(self) -> str:
return self.cluster.scylla_mode
def set_smp(self, smp: int) -> None:
self._smp_set_during_test = smp
def smp(self) -> int:
return self._smp_set_during_test or self._smp or DEFAULT_SMP
def memory(self) -> int:
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
if memory:
self._memory = memory // (smp or self.smp()) * self.smp()
if smp:
memory_per_cpu = self.memory() // self.smp()
self._smp = smp
self._memory = memory_per_cpu * self.smp()
def set_mem_mb_per_cpu(self, mem: int) -> None: # not used in scylla-dtest
raise NotImplementedError("setting memory per CPU during a test is not supported")
def address(self) -> str:
"""Return the IP use by this node for internal communication."""
return self.network_interfaces["storage"][0]
def is_running(self) -> bool:
try:
return self.cluster.manager.server_get_returncode(server_id=self.server_id) is None
except NoSuchProcess:
return False
is_live = is_running
@cached_property
def scylla_log_file(self) -> ScyllaLogFile:
return self.cluster.manager.server_open_log(server_id=self.server_id)
def grep_log(self,
expr: str,
filter_expr: str | None = None,
filename: str | None = None, # not used in scylla-dtest
from_mark: int | None = None) -> list[tuple[str, re.Match[str]]]:
assert filename is None, "only ScyllaDB's log is supported"
return self.scylla_log_file.grep(expr=expr, filter_expr=filter_expr, from_mark=from_mark)
def grep_log_for_errors(self,
filename: str | None = None, # not used in scylla-dtest
distinct_errors: bool = False,
search_str: str | None = None, # not used in scylla-dtest
case_sensitive: bool = True, # not used in scylla-dtest
from_mark: int | None = None) -> list[str] | list[list[str]]:
assert filename is None, "only ScyllaDB's log is supported"
assert search_str is None, "argument `search_str` is not supported"
assert case_sensitive, "only case sensitive search is supported"
from_mark = getattr(self, "error_mark", None) if from_mark is None else from_mark
return self.scylla_log_file.grep_for_errors(distinct_errors=distinct_errors, from_mark=from_mark)
def mark_log_for_errors(self, filename: str | None = None) -> None:
assert filename is None, "only ScyllaDB's log is supported"
self.error_mark = self.mark_log()
def mark_log(self, filename: str | None = None) -> int:
assert filename is None, "only ScyllaDB's log is supported"
return self.scylla_log_file.mark()
def watch_log_for(self,
exprs: str | list[str],
from_mark: int | None = None,
timeout: float = 600,
process: subprocess.Popen | None = None, # don't use it here
verbose: bool | None = None, # not used in scylla-dtest
filename: str | None = None, # not used in scylla-dtest
polling_interval: float | None = None) -> tuple[str, re.Match[str]] | list[tuple[str, re.Match[str]]]: # not used in scylla-dtest
assert process is None, "argument `process` is not supported"
assert verbose is None, "argument `verbose` is not supported"
assert filename is None, "only ScyllaDB's log is supported"
assert polling_interval is None, "argument `polling_interval` is not supported"
if isinstance(exprs, str):
exprs = [exprs]
_, matches = self.scylla_log_file.wait_for(*exprs, from_mark=from_mark, timeout=timeout)
return matches[0] if len(matches) == 1 else matches
def watch_log_for_death(self,
nodes: ScyllaNode | list[ScyllaNode],
from_mark: int | None = None,
timeout: float = 600,
filename: str | None = None) -> None:
"""Watch the log of this node until it detects that the provided other nodes are marked dead.
This method returns nothing but throw a TimeoutError if all the requested node have not been found
to be marked dead before timeout sec.
A mark as returned by mark_log() can be used as the `from_mark` parameter to start watching the log
from a given position. Otherwise, the log is watched from the beginning.
"""
assert filename is None, "only ScyllaDB's log is supported"
if not isinstance(nodes, list):
nodes = [nodes]
self.watch_log_for(
[f"({node.address()}|{node.hostid()}).* now (dead|DOWN)" for node in nodes],
from_mark=from_mark,
timeout=timeout,
)
def watch_log_for_alive(self,
nodes: ScyllaNode | list[ScyllaNode],
from_mark: int | None = None,
timeout: float = 120,
filename: str | None = None) -> None:
"""Watch the log of this node until it detects that the provided other nodes are marked UP.
This method works similarly to watch_log_for_death().
"""
assert filename is None, "only ScyllaDB's log is supported"
if not isinstance(nodes, list):
nodes = [nodes]
self.watch_log_for(
[f"({node.address()}|{node.hostid()}).* now UP" for node in nodes],
from_mark=from_mark,
timeout=timeout,
)
def watch_rest_for_alive(self,
nodes: ScyllaNode | list[ScyllaNode],
timeout: float = 120,
wait_normal_token_owner: bool = True) -> None:
nodes = nodes if isinstance(nodes, list) else [nodes]
tofind_host_id_map = {node.address(): node.hostid() for node in nodes}
tofind = {node.address() for node in nodes}
node_ip = self.address()
found = set()
found_host_id_map = {}
api = self.cluster.manager.api
deadline = time.perf_counter() + timeout
while time.perf_counter() < deadline:
if tofind <= set(api.get_alive_endpoints(node_ip=node_ip)) - set(api.get_joining_nodes(node_ip=node_ip)):
if not any(node for node in tofind if not api.get_tokens(node_ip=node_ip, endpoint=node)):
if not wait_normal_token_owner:
return
# Verify other nodes are considered normal token owners on this node and their host_ids
# match the host_ids the client knows about.
host_id_map = {x["key"]: x["value"] for x in api.get_host_id_map(dst_server_ip=node_ip)}
found_host_id_map.update(host_id_map)
for addr, host_id in host_id_map.items():
if addr not in tofind_host_id_map:
continue
if host_id == tofind_host_id_map[addr] or not tofind_host_id_map[addr]:
tofind.discard(addr)
found.add(addr)
if not tofind:
return
time.sleep(0.1)
self.debug(f"watch_rest_for_alive: {tofind=} {found=}: {tofind_host_id_map=} {found_host_id_map=}")
raise TimeoutError(f"watch_rest_for_alive() timeout after {timeout} seconds")
def wait_for_binary_interface(self, from_mark: int | None = None, timeout: float | None = None) -> None:
"""Waits for the binary CQL interface to be listening."""
if timeout is None:
timeout = self.cluster.default_wait_for_binary_proto
self.watch_log_for(exprs="Starting listening for CQL clients", from_mark=from_mark, timeout=timeout)
def wait_until_stopped(self,
wait_seconds: int | None = None,
marks: list[tuple[ScyllaNode, int]] | None = None,
dump_core: bool = True) -> None: # not implemented
if wait_seconds is None:
wait_seconds = 127 if self.scylla_mode() != "debug" else 600
if not wait_for(func=lambda: not self.is_running(), timeout=wait_seconds):
raise NodeError(f"Problem stopping node {self.name}")
for node, mark in marks or []:
if node.server_id != self.server_id:
node.watch_log_for_death(nodes=self, from_mark=mark)
def _process_scylla_args(self, *args: str) -> list[str]:
# Parse default overrides in SCYLLA_EXT_OPTS
scylla_args = _parse_scylla_args(os.environ.get("SCYLLA_EXT_OPTS", "").split())
if smp := scylla_args.pop("--smp", None):
smp = int(smp[0])
if memory := scylla_args.pop("--memory", None):
memory = _parse_size(memory[0])
self._adjust_smp_and_memory(smp=smp, memory=memory)
if args:
parsed_args = []
for arg in args:
option, *value = arg.split("=")
if not value or option not in CASSANDRA_OPTIONS_MAPPING:
parsed_args.append(arg)
elif len(value) == 1:
scylla_args[CASSANDRA_OPTIONS_MAPPING[option]] = value
else:
raise RuntimeError(f"Option {arg} not in form '-Dcassandra.foo=bar'. Please check your test")
scylla_args.update(_parse_scylla_args(parsed_args))
if smp := scylla_args.pop("--smp", None):
self._adjust_smp_and_memory(smp=int(smp[0]))
if memory := scylla_args.pop("--memory", None):
self._memory = _parse_size(memory[0])
default_scylla_args = {
"--smp": [str(self.smp())],
"--memory": [f"{self.memory() // 1024 ** 2}M"],
"--developer-mode": ["true"],
"--default-log-level": [self.__global_log_level],
"--kernel-page-cache": ["1"],
"--commitlog-use-o-dsync": ["0"],
"--max-networking-io-control-blocks": ["1000"],
"--unsafe-bypass-fsync": ["1"],
"--num-tokens": ["16"],
}
if self.scylla_mode() == "debug":
default_scylla_args["--blocked-reactor-notify-ms"] = ["5000"]
scylla_args = default_scylla_args | scylla_args
if "--cpuset" not in scylla_args:
scylla_args["--overprovisioned"] = [""]
return list(chain.from_iterable(
(arg, value) if values and all(values) else (arg, )
for arg, values in scylla_args.items()
for value in values
))
@staticmethod
def _process_scylla_env() -> dict[str, str]:
scylla_env = {}
for var in os.environ.get("SCYLLA_EXT_ENV", "").replace(";" , " ").split():
k, v = var.split(sep="=", maxsplit=1)
if not v:
raise RuntimeError(f"SCYLLA_EXT_ENV: unable to parse {var!r} as an env variable")
scylla_env[k] = v
return scylla_env
def start(self,
join_ring: bool | None = None, # not used in scylla-dtest
no_wait: bool = False,
verbose: bool | None = None, # not used in scylla-dtest
update_pid: bool = True, # not used here
wait_other_notice: bool | None = None,
wait_normal_token_owner: bool | None = None,
replace_token: str | None = None, # not used in scylla-dtest
replace_address: str | None = None,
replace_node_host_id: str | None = None,
jvm_args: list[str] | None = None,
wait_for_binary_proto: bool | None = None,
profile_options: dict[str, str] | None = None, # not used in scylla-dtest
use_jna: bool | None = None, # not used in scylla-dtest
quiet_start: bool | None = None) -> None: # not used in scylla-dtest
assert join_ring is None, "argument `join_ring` is not supported"
assert verbose is None, "argument `verbose` is not supported"
assert replace_token is None, "argument `replace_token` is not supported"
assert profile_options is None, "argument `profile_options` is not supported"
assert use_jna is None, "argument `use_jna` is not supported"
assert quiet_start is None, "argument `quiet_start` is not supported"
assert replace_address is None or replace_node_host_id is None, \
"replace_address and replace_node_host_id cannot be specified together"
if self.is_running():
raise NodeError(f"{self.name} is already running")
scylla_args = self._process_scylla_args(
*(jvm_args or []),
*(["--replace-address", replace_address] if replace_address else []),
*(["--replace-node-first-boot", replace_node_host_id] if replace_node_host_id else []),
)
scylla_env = self._process_scylla_env()
marks = []
if wait_other_notice:
marks = [(node, node.mark_log()) for node in self.cluster.nodelist() if node.is_live()]
self.mark = self.mark_log()
self.cluster.manager.server_start(
server_id=self.server_id,
seeds=None if self.bootstrap else [self.address()],
expected_server_up_state=ServerUpState.PROCESS_STARTED,
cmdline_options_override=scylla_args,
append_env_override=scylla_env,
connect_driver=False,
)
if wait_for_binary_proto is None:
wait_for_binary_proto = self.cluster.force_wait_for_cluster_start and not no_wait
if wait_other_notice is None:
wait_other_notice = self.cluster.force_wait_for_cluster_start and not no_wait
if wait_normal_token_owner is None and wait_other_notice:
wait_normal_token_owner = True
if wait_for_binary_proto:
self.wait_for_binary_interface(from_mark=self.mark)
if wait_other_notice:
timeout = self.cluster.default_wait_other_notice_timeout
for node, mark in marks:
node.watch_log_for_alive(nodes=self, from_mark=mark, timeout=timeout)
node.watch_rest_for_alive(nodes=self, timeout=timeout, wait_normal_token_owner=wait_normal_token_owner)
self.watch_rest_for_alive(nodes=node, timeout=timeout, wait_normal_token_owner=wait_normal_token_owner)
def stop(self,
wait: bool = True,
wait_other_notice: bool = False,
other_nodes: list[ScyllaNode] | None = None,
gently: bool = True,
wait_seconds: int = 127,
marks: list[int] | None = None) -> bool:
if not self.is_running():
return False
if marks is None:
marks = [
(node, node.mark_log())
for node in other_nodes or self.cluster.nodelist()
if node.server_id != self.server_id and node.is_live()
] if wait_other_notice else []
if gently:
self.cluster.manager.server_stop_gracefully(server_id=self.server_id)
else:
self.cluster.manager.server_stop(server_id=self.server_id)
if wait or wait_other_notice:
self.wait_until_stopped(wait_seconds=wait_seconds, marks=marks, dump_core=gently)
return True
def nodetool(self,
cmd: str,
capture_output: bool = True,
wait: bool = True,
timeout: int | float | None = None,
verbose: bool = True) -> tuple[str, str]:
if capture_output and not wait:
raise ArgumentError("Cannot set capture_output while wait is False.")
nodetool_cmd = [
self.cluster.manager.server_get_exe(server_id=self.server_id),
"nodetool",
"-h",
str(self.cluster.manager.get_host_ip(server_id=self.server_id)),
*cmd.split(),
]
if verbose:
self.debug(f"nodetool cmd={nodetool_cmd} wait={wait} timeout={timeout}")
if capture_output:
p = subprocess.Popen(nodetool_cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(timeout=timeout)
else:
p = subprocess.Popen(nodetool_cmd, universal_newlines=True)
stdout, stderr = None, None
if wait and p.wait(timeout=timeout):
raise NodetoolError(" ".join(nodetool_cmd), p.returncode, stdout, stderr)
stderr = "\n".join(
line for line in stderr.splitlines()
if self.debug(f"checking {line}") or not any(p.fullmatch(line) for p in NODETOOL_STDERR_IGNORED_PATTERNS)
)
return stdout, stderr
def get_path(self) -> str:
"""Return the path to this node top level directory (where config/data is stored.)"""
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
def stress(self, stress_options: list[str], **kwargs):
"""
Run `cassandra-stress` against this node.
This method does not do any result parsing.
:param stress_options: List of options to pass to `cassandra-stress`.
:param kwargs: Additional arguments to pass to `subprocess.Popen()`.
:return: Named tuple with `stdout`, `stderr`, and `rc` (return code).
"""
cmd_args = ["cassandra-stress"] + stress_options
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
cmd_args.extend(["-node", self.address()])
p = subprocess.Popen(
cmd_args,
stdout=kwargs.pop("stdout", subprocess.PIPE),
stderr=kwargs.pop("stderr", subprocess.PIPE),
universal_newlines=True,
**kwargs,
)
try:
stdout, stderr = p.communicate()
if rc := p.returncode:
raise ToolError(cmd_args, rc, stdout, stderr)
ret = namedtuple("Subprocess_Return", "stdout stderr rc")
return ret(stdout=stdout, stderr=stderr, rc=rc)
except KeyboardInterrupt:
pass
def _set_stress_val(self, key, val, res):
"""
Normalize a stress result string and populate aggregate/read/write metrics.
Removes comma-thousands separators from numbers, converts to float,
stores the aggregate metric under `key`.
If the value contains a "[READ ..., WRITE ...]" block, also stores the
read and write metrics under `key:read` and `key:write`.
:param key: The metric name
:param val: The metric value string
:param res: The dictionary to populate
"""
def parse_num(s):
return float(s.replace(',', ''))
if "[" in val:
p = STRESS_SUMMARY_PATTERN
m = p.match(val)
if m:
res[key] = parse_num(m.group(1))
p = STRESS_READ_PATTERN
m = p.match(val)
if m:
res[key + ":read"] = parse_num(m.group(1))
p = STRESS_WRITE_PATTERN
m = p.match(val)
if m:
res[key + ":write"] = parse_num(m.group(1))
else:
try:
res[key] = parse_num(val)
except ValueError:
res[key] = val
def stress_object(self, stress_options=None, ignore_errors=None, **kwargs):
"""
Run stress test and return results as a structured metrics dictionary.
Runs `stress()`, finds the `Results:` section in `stdout`, and then
processes each `key : value` line, putting it into a dictionary.
:param stress_options: List of stress options to pass to `stress()`.
:param ignore_errors: Deprecated (no effect).
:param kwargs: Additional arguments to pass to `stress()`.
:return: Dictionary of stress test results.
"""
if ignore_errors:
self.warning("passing `ignore_errors` to stress_object() is deprecated")
ret = self.stress(stress_options, **kwargs)
p = STRESS_KEY_VALUE_PATTERN
res = {}
start = False
for line in (s.strip() for s in ret.stdout.splitlines()):
if start:
m = p.match(line)
if m:
self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res)
else:
if line == 'Results:':
start = True
return res
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
cmd = ["flush"]
if ks:
cmd.append(ks)
if table:
cmd.append(table)
self.nodetool(" ".join(cmd), **kwargs)
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
compact_cmd = ["compact"]
if keyspace:
compact_cmd.append(keyspace)
compact_cmd += tables
self.nodetool(" ".join(compact_cmd))
def drain(self, block_on_log: bool = False) -> None:
mark = self.mark_log()
self.nodetool("drain")
if block_on_log:
self.watch_log_for("DRAINED", from_mark=mark)
def repair(self, options: list[str] | None = None, **kwargs) -> tuple[str, str]:
cmd = ["repair"]
if options:
cmd.extend(options)
return self.nodetool(" ".join(cmd), **kwargs)
def decommission(self) -> None:
self.cluster.manager.decommission_node(server_id=self.server_id)
def get_sstables(self, keyspace, column_family, ignore_unsealed=True, cleanup_unsealed=False):
keyspace_dir = os.path.join(self.get_path(), 'data', keyspace)
cf_glob = '*'
if column_family:
cf_glob = column_family + '-*'
if not os.path.exists(keyspace_dir):
raise ArgumentError(f"Unknown keyspace {keyspace}")
files = glob.glob(os.path.join(keyspace_dir, cf_glob, "*big-Data.db"))
for f in files:
if os.path.exists(f.replace('Data.db', 'Compacted')):
files.remove(f)
if ignore_unsealed or cleanup_unsealed:
# sstablelevelreset tries to open all sstables under the configured
# `data_file_directories` in cassandra.yaml. if any of them does not
# have missing component, it complains in its stderr. and some tests
# using this tool checks the stderr for unexpected error message, so
# we need to remove all unsealed sstable files in this case.
for toc_tmp in glob.glob(os.path.join(keyspace_dir, cf_glob, '*TOC.txt.tmp')):
if cleanup_unsealed:
self.info(f"get_sstables: Cleaning up unsealed SSTable: {toc_tmp}")
for unsealed in glob.glob(toc_tmp.replace('TOC.txt.tmp', '*')):
os.remove(unsealed)
else:
self.info(f"get_sstables: Ignoring unsealed SSTable: {toc_tmp}")
data_sst = toc_tmp.replace('TOC.txt.tmp', 'Data.db')
try:
files.remove(data_sst)
except ValueError:
pass
return files
def __gather_sstables(self, datafiles=None, keyspace=None, columnfamilies=None):
files = []
if keyspace is None:
for k in self.list_keyspaces():
files = files + self.get_sstables(k, "")
elif datafiles is None:
if columnfamilies is None:
files = files + self.get_sstables(keyspace, "")
else:
for cf in columnfamilies:
files = files + self.get_sstables(keyspace, cf)
else:
if not columnfamilies or len(columnfamilies) > 1:
raise ArgumentError("Exactly one column family must be specified with datafiles")
cf_dir = os.path.join(os.path.realpath(self.get_path()), 'data', keyspace, columnfamilies[0])
sstables = set()
for datafile in datafiles:
if not os.path.isabs(datafile):
datafile = os.path.join(os.getcwd(), datafile)
if not datafile.startswith(cf_dir + '-') and not datafile.startswith(cf_dir + os.sep):
raise NodeError("File doesn't appear to belong to the specified keyspace and column family: " + datafile)
sstable = _sstable_regexp.match(os.path.basename(datafile))
if not sstable:
raise NodeError("File doesn't seem to be a valid sstable filename: " + datafile)
sstable = sstable.groupdict()
if not sstable['tmp'] and sstable['identifier'] not in sstables:
if not os.path.exists(datafile):
raise IOError("File doesn't exist: " + datafile)
sstables.add(sstable['identifier'])
files.append(datafile)
return files
def run_scylla_sstable(self, command, additional_args=None, keyspace=None, datafiles=None, column_families=None, batch=False, text=True, env=None):
"""Invoke scylla-sstable, with the specified command (operation) and additional_args.
For more information about scylla-sstable, see https://docs.scylladb.com/stable/operating-scylla/admin-tools/scylla-sstable.html.
Params:
* command - The scylla-sstable command (operation) to run.
* additional_args - Additional command-line arguments to pass to scylla-sstable, this should be a list of strings.
* keyspace - Restrict the operation to sstables of this keyspace.
* datafiles - Restrict the operation to the specified sstables (Data components).
* column_families - Restrict the operation to sstables of these column_families. Must contain exactly one column family when datafiles is used.
* batch - If True, all sstables will be passed in a single batch. If False, sstables will be passed one at a time.
Batch-mode can be only used if column_families contains a single item.
* text - If True, output of the command is treated as text, if not as bytes
If datafiles is provided, the caller is responsible for making sure these
files are not removed by ScyllaDB while the tool is running.
If datafiles = None, this function will create a snapshot and dump
sstables from the snapshot to ensure that the sstables are not removed
while the tools is running.
The snapshot is removed after the dump completed, but it is left there in
case of error, for post-mortem analysis.
Returns: map: {sstable: (stdout, stderr)} of all invokations. When batch == True, a single entry will be present, with empty key.
Raises: subprocess.CalledProcessError if scylla-sstable returns a non-zero exit code.
"""
if additional_args is None:
additional_args = []
scylla_path = self.cluster.manager.server_get_exe(server_id=self.server_id)
ret = {}
if datafiles is None and keyspace is not None and self.is_running():
tag = "sstable-dump-{}".format(uuid.uuid1())
kts = ",".join(f"{keyspace}.{column_family}" for column_family in column_families)
self.debug(f"run_scylla_sstable(): creating snapshot with tag {tag} to be used for sstable dumping")
self.nodetool(f"snapshot -t {tag} {kts}")
sstables = []
for column_family in column_families:
sstables.extend(glob.glob(os.path.join(self.get_path(), 'data', keyspace, f"{column_family}-*/snapshots/{tag}/*-Data.db")))
else:
sstables = self.__gather_sstables(datafiles, keyspace, column_families)
tag = None
self.debug(f"run_scylla_sstable(): preparing to dump sstables {sstables}")
def do_invoke(sstables):
# there are chances that the table is not replicated on this node,
# in that case, scylla tool will fail to dump the sstables and error
# out. let's just return an empty list for the partitions, so that
# dump_sstables() can still parse it in the same way.
if not sstables:
empty_dump = {'sstables': {'anonymous': []}}
stdout, stderr = json.dumps(empty_dump), ''
if text:
return stdout, stderr
else:
return stdout.encode('utf-8'), stderr.encode('utf-8')
common_args = [scylla_path, "sstable", command] + additional_args
res = subprocess.run(common_args + sstables, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=text, check=False, env=env)
if res.returncode:
raise ToolError(command=' '.join(common_args + sstables), exit_status=res.returncode, stdout=res.stdout, stderr=res.stderr)
return (res.stdout, res.stderr)
if batch:
if column_families is None or len(column_families) > 1:
raise NodeError("run_scylla_sstable(): batch mode can only be used in conjunction with a single column_family")
ret[""] = do_invoke(sstables)
else:
for sst in sstables:
ret[sst] = do_invoke([sst])
# Deliberately not putting this in a `finally` block, if the command
# above failed, leave the snapshot with the sstables around for
# post-mortem analysis.
if tag is not None:
self.nodetool(f"clearsnapshot -t {tag} {keyspace}")
return ret
def hostid(self, timeout: float | None = None, force_refresh: bool | None = None) -> str | None:
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
try:
return self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
def rmtree(self, path: str | Path) -> None:
"""Delete a directory content without removing the directory.
Copied this code from Python's documentation for Path.walk() method.
"""
for root, dirs, files in Path(path).walk(top_down=False):
for name in files:
(root / name).unlink()
for name in dirs:
(root / name).rmdir()
def _log_message(self, message: str) -> str:
return f"{self.name}: {message}"
def debug(self, message: str) -> None:
self.cluster.debug(self._log_message(message))
def info(self, message: str) -> None:
self.cluster.info(self._log_message(message))
def warning(self, message: str) -> None:
self.cluster.warning(self._log_message(message))
def error(self, message: str) -> None:
self.cluster.error(self._log_message(message))
def __repr__(self) -> str:
return f"<ScyllaNode name={self.server_id} dc={self.data_center} rack={self.rack}>"
def _parse_scylla_args(args: list[str]) -> dict[str, list[str]]:
parsed_args = {}
args = iter(args)
arg = next(args, None)
while arg is not None:
assert arg.startswith("-")
key, *value = arg.split(sep="=", maxsplit=1)
if value: # handle argument in `--foo=bar` form
arg = next(args, None)
else:
for arg in args: # handle argument in `--foo bar` form
if arg.startswith("-"):
break
value.append(arg)
else: # no more arguments
arg = None
if key.startswith("--scylla-manager"): # skip Scylla Manager arguments
continue
parsed_args.setdefault(key, []).append(" ".join(value))
return parsed_args
def _parse_size(s: str) -> int:
try:
factor = 1024 ** abs("k KMGT".index(s[-1]) - 1)
except ValueError:
return int(s)
return int(s[:-1]) * factor