Files
scylladb/test/cluster/dtest/ccmlib/scylla_node.py
Botond Dénes 357f91de52 Revert "Merge 'db/config: enable ms sstable format by default' from Michał Chojnowski"
This reverts commit b0643f8959, reversing
changes made to e8b0f8faa9.

The change forgot to update
sstables_manager::get_highest_supported_format(), which results in
/system/highest_supported_sstable_version still returning me, confusing
and breaking tests.

Fixes: scylladb/scylla-dtest#6435

Closes scylladb/scylladb#27379
2025-12-02 14:38:56 +02:00

656 lines
25 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import os
import re
import time
import locale
import subprocess
from collections import namedtuple
from itertools import chain
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for
from test.pylib.internal_types import ServerUpState
from test.pylib.manager_client import NoSuchProcess
if TYPE_CHECKING:
from test.pylib.internal_types import ServerInfo
from test.pylib.log_browsing import ScyllaLogFile
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
NODETOOL_STDERR_IGNORED_PATTERNS = (
re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
re.compile(
r"==[0-9]+==WARNING: ASan doesn't fully support makecontext/swapcontext"
r" functions and may produce false positives in some cases!"
),
)
CASSANDRA_OPTIONS_MAPPING = {
"-Dcassandra.replace_address_first_boot": "--replace-address-first-boot",
}
DEFAULT_SMP = 2
DEFAULT_MEMORY_PER_CPU = 512 * 1024 * 1024 # bytes
DEFAULT_SCYLLA_LOG_LEVEL = "info"
KNOWN_LOG_LEVELS = {
"TRACE": "trace",
"DEBUG": "debug",
"INFO": "info",
"WARN": "warn",
"ERROR": "error",
"OFF": "info",
}
class NodeError(Exception):
def __init__(self, msg: str, process: int | None = None):
super().__init__(msg)
self.process = process
class ToolError(Exception):
def __init__(self, command: str | list[str], exit_status: int, stdout: Any = None, stderr: Any = None):
self.command = command
self.exit_status = exit_status
self.stdout = stdout
self.stderr = stderr
message = [f"Subprocess {command} exited with non-zero status; exit status: {exit_status}"]
if stdout:
message.append(f"stdout: {self._decode(stdout)}")
if stderr:
message.append(f"stderr: {self._decode(stderr)}")
Exception.__init__(self, "; \n".join(message))
@staticmethod
def _decode(value: str | bytes) -> str:
if isinstance(value, bytes):
return bytes.decode(value, locale.getpreferredencoding(do_setlocale=False))
return value
NodetoolError = ToolError
class ScyllaNode:
def __init__(self, cluster: ScyllaCluster, server: ServerInfo, name: str):
self.cluster = cluster
self.server_id = server.server_id
self.name = name
self.pid = None
self.all_pids = []
self.network_interfaces = {
"storage": (str(server.rpc_address), 7000),
"binary": (str(server.rpc_address), 9042),
}
self.data_center = server.datacenter
self.rack = server.rack
self._smp_set_during_test = None
self._smp = None
self._memory = None
self.__global_log_level = "info"
self.__classes_log_level = {}
self.bootstrap = True
def set_configuration_options(self,
values: dict | None = None,
batch_commitlog: bool | None = None) -> None:
"""Set DB node configuration options.
Example:
node.set_configuration_options(values={
'hinted_handoff_enabled' : True,
'concurrent_writes' : 64,
})
The batch_commitlog option gives an easier way to switch to batch
commitlog (since it requires setting 2 options and unsetting one).
"""
self.cluster.set_configuration_options(values=values, batch_commitlog=batch_commitlog, nodes=self)
def set_log_level(self, new_level: str, class_name: str | None = None) -> ScyllaNode:
if new_log_level := KNOWN_LOG_LEVELS.get(new_level):
if class_name is None:
self.__global_log_level = new_log_level
else:
self.__classes_log_level[class_name] = new_log_level
return self
raise ArgumentError(f"Unknown log level {new_level} (use one of {' '.join(KNOWN_LOG_LEVELS)})")
def scylla_mode(self) -> str:
return self.cluster.scylla_mode
def set_smp(self, smp: int) -> None:
self._smp_set_during_test = smp
def smp(self) -> int:
return self._smp_set_during_test or self._smp or DEFAULT_SMP
def memory(self) -> int:
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
if memory:
self._memory = memory // (smp or self.smp()) * self.smp()
if smp:
memory_per_cpu = self.memory() // self.smp()
self._smp = smp
self._memory = memory_per_cpu * self.smp()
def set_mem_mb_per_cpu(self, mem: int) -> None: # not used in scylla-dtest
raise NotImplementedError("setting memory per CPU during a test is not supported")
def address(self) -> str:
"""Return the IP use by this node for internal communication."""
return self.network_interfaces["storage"][0]
def is_running(self) -> bool:
try:
return self.cluster.manager.server_get_returncode(server_id=self.server_id) is None
except NoSuchProcess:
return False
is_live = is_running
@cached_property
def scylla_log_file(self) -> ScyllaLogFile:
return self.cluster.manager.server_open_log(server_id=self.server_id)
def grep_log(self,
expr: str,
filter_expr: str | None = None,
filename: str | None = None, # not used in scylla-dtest
from_mark: int | None = None) -> list[tuple[str, re.Match[str]]]:
assert filename is None, "only ScyllaDB's log is supported"
return self.scylla_log_file.grep(expr=expr, filter_expr=filter_expr, from_mark=from_mark)
def grep_log_for_errors(self,
filename: str | None = None, # not used in scylla-dtest
distinct_errors: bool = False,
search_str: str | None = None, # not used in scylla-dtest
case_sensitive: bool = True, # not used in scylla-dtest
from_mark: int | None = None) -> list[str] | list[list[str]]:
assert filename is None, "only ScyllaDB's log is supported"
assert search_str is None, "argument `search_str` is not supported"
assert case_sensitive, "only case sensitive search is supported"
from_mark = getattr(self, "error_mark", None) if from_mark is None else from_mark
return self.scylla_log_file.grep_for_errors(distinct_errors=distinct_errors, from_mark=from_mark)
def mark_log_for_errors(self, filename: str | None = None) -> None:
assert filename is None, "only ScyllaDB's log is supported"
self.error_mark = self.mark_log()
def mark_log(self, filename: str | None = None) -> int:
assert filename is None, "only ScyllaDB's log is supported"
return self.scylla_log_file.mark()
def watch_log_for(self,
exprs: str | list[str],
from_mark: int | None = None,
timeout: float = 600,
process: subprocess.Popen | None = None, # don't use it here
verbose: bool | None = None, # not used in scylla-dtest
filename: str | None = None, # not used in scylla-dtest
polling_interval: float | None = None) -> tuple[str, re.Match[str]] | list[tuple[str, re.Match[str]]]: # not used in scylla-dtest
assert process is None, "argument `process` is not supported"
assert verbose is None, "argument `verbose` is not supported"
assert filename is None, "only ScyllaDB's log is supported"
assert polling_interval is None, "argument `polling_interval` is not supported"
if isinstance(exprs, str):
exprs = [exprs]
_, matches = self.scylla_log_file.wait_for(*exprs, from_mark=from_mark, timeout=timeout)
return matches[0] if len(matches) == 1 else matches
def watch_log_for_death(self,
nodes: ScyllaNode | list[ScyllaNode],
from_mark: int | None = None,
timeout: float = 600,
filename: str | None = None) -> None:
"""Watch the log of this node until it detects that the provided other nodes are marked dead.
This method returns nothing but throw a TimeoutError if all the requested node have not been found
to be marked dead before timeout sec.
A mark as returned by mark_log() can be used as the `from_mark` parameter to start watching the log
from a given position. Otherwise, the log is watched from the beginning.
"""
assert filename is None, "only ScyllaDB's log is supported"
if not isinstance(nodes, list):
nodes = [nodes]
self.watch_log_for(
[f"({node.address()}|{node.hostid()}).* now (dead|DOWN)" for node in nodes],
from_mark=from_mark,
timeout=timeout,
)
def watch_log_for_alive(self,
nodes: ScyllaNode | list[ScyllaNode],
from_mark: int | None = None,
timeout: float = 120,
filename: str | None = None) -> None:
"""Watch the log of this node until it detects that the provided other nodes are marked UP.
This method works similarly to watch_log_for_death().
"""
assert filename is None, "only ScyllaDB's log is supported"
if not isinstance(nodes, list):
nodes = [nodes]
self.watch_log_for(
[f"({node.address()}|{node.hostid()}).* now UP" for node in nodes],
from_mark=from_mark,
timeout=timeout,
)
def watch_rest_for_alive(self,
nodes: ScyllaNode | list[ScyllaNode],
timeout: float = 120,
wait_normal_token_owner: bool = True) -> None:
nodes = nodes if isinstance(nodes, list) else [nodes]
tofind_host_id_map = {node.address(): node.hostid() for node in nodes}
tofind = {node.address() for node in nodes}
node_ip = self.address()
found = set()
found_host_id_map = {}
api = self.cluster.manager.api
deadline = time.perf_counter() + timeout
while time.perf_counter() < deadline:
if tofind <= set(api.get_alive_endpoints(node_ip=node_ip)) - set(api.get_joining_nodes(node_ip=node_ip)):
if not any(node for node in tofind if not api.get_tokens(node_ip=node_ip, endpoint=node)):
if not wait_normal_token_owner:
return
# Verify other nodes are considered normal token owners on this node and their host_ids
# match the host_ids the client knows about.
host_id_map = {x["key"]: x["value"] for x in api.get_host_id_map(dst_server_ip=node_ip)}
found_host_id_map.update(host_id_map)
for addr, host_id in host_id_map.items():
if addr not in tofind_host_id_map:
continue
if host_id == tofind_host_id_map[addr] or not tofind_host_id_map[addr]:
tofind.discard(addr)
found.add(addr)
if not tofind:
return
time.sleep(0.1)
self.debug(f"watch_rest_for_alive: {tofind=} {found=}: {tofind_host_id_map=} {found_host_id_map=}")
raise TimeoutError(f"watch_rest_for_alive() timeout after {timeout} seconds")
def wait_for_binary_interface(self, from_mark: int | None = None, timeout: float | None = None) -> None:
"""Waits for the binary CQL interface to be listening."""
if timeout is None:
timeout = self.cluster.default_wait_for_binary_proto
self.watch_log_for(exprs="Starting listening for CQL clients", from_mark=from_mark, timeout=timeout)
def wait_until_stopped(self,
wait_seconds: int | None = None,
marks: list[tuple[ScyllaNode, int]] | None = None,
dump_core: bool = True) -> None: # not implemented
if wait_seconds is None:
wait_seconds = 127 if self.scylla_mode() != "debug" else 600
if not wait_for(func=lambda: not self.is_running(), timeout=wait_seconds):
raise NodeError(f"Problem stopping node {self.name}")
for node, mark in marks or []:
if node.server_id != self.server_id:
node.watch_log_for_death(nodes=self, from_mark=mark)
def _process_scylla_args(self, *args: str) -> list[str]:
# Parse default overrides in SCYLLA_EXT_OPTS
scylla_args = _parse_scylla_args(os.environ.get("SCYLLA_EXT_OPTS", "").split())
if smp := scylla_args.pop("--smp", None):
smp = int(smp[0])
if memory := scylla_args.pop("--memory", None):
memory = _parse_size(memory[0])
self._adjust_smp_and_memory(smp=smp, memory=memory)
if args:
parsed_args = []
for arg in args:
option, *value = arg.split("=")
if not value or option not in CASSANDRA_OPTIONS_MAPPING:
parsed_args.append(arg)
elif len(value) == 1:
scylla_args[CASSANDRA_OPTIONS_MAPPING[option]] = value
else:
raise RuntimeError(f"Option {arg} not in form '-Dcassandra.foo=bar'. Please check your test")
scylla_args.update(_parse_scylla_args(parsed_args))
if smp := scylla_args.pop("--smp", None):
self._adjust_smp_and_memory(smp=int(smp[0]))
if memory := scylla_args.pop("--memory", None):
self._memory = _parse_size(memory[0])
default_scylla_args = {
"--smp": [str(self.smp())],
"--memory": [f"{self.memory() // 1024 ** 2}M"],
"--developer-mode": ["true"],
"--default-log-level": [self.__global_log_level],
"--kernel-page-cache": ["1"],
"--commitlog-use-o-dsync": ["0"],
"--max-networking-io-control-blocks": ["1000"],
"--unsafe-bypass-fsync": ["1"],
}
if self.scylla_mode() == "debug":
default_scylla_args["--blocked-reactor-notify-ms"] = ["5000"]
scylla_args = default_scylla_args | scylla_args
if "--cpuset" not in scylla_args:
scylla_args["--overprovisioned"] = [""]
return list(chain.from_iterable(
(arg, value) if values and all(values) else (arg, )
for arg, values in scylla_args.items()
for value in values
))
@staticmethod
def _process_scylla_env() -> dict[str, str]:
scylla_env = {}
for var in os.environ.get("SCYLLA_EXT_ENV", "").replace(";" , " ").split():
k, v = var.split(sep="=", maxsplit=1)
if not v:
raise RuntimeError(f"SCYLLA_EXT_ENV: unable to parse {var!r} as an env variable")
scylla_env[k] = v
return scylla_env
def start(self,
join_ring: bool | None = None, # not used in scylla-dtest
no_wait: bool = False,
verbose: bool | None = None, # not used in scylla-dtest
update_pid: bool = True, # not used here
wait_other_notice: bool | None = None,
wait_normal_token_owner: bool | None = None,
replace_token: str | None = None, # not used in scylla-dtest
replace_address: str | None = None,
replace_node_host_id: str | None = None,
jvm_args: list[str] | None = None,
wait_for_binary_proto: bool | None = None,
profile_options: dict[str, str] | None = None, # not used in scylla-dtest
use_jna: bool | None = None, # not used in scylla-dtest
quiet_start: bool | None = None) -> None: # not used in scylla-dtest
assert join_ring is None, "argument `join_ring` is not supported"
assert verbose is None, "argument `verbose` is not supported"
assert replace_token is None, "argument `replace_token` is not supported"
assert profile_options is None, "argument `profile_options` is not supported"
assert use_jna is None, "argument `use_jna` is not supported"
assert quiet_start is None, "argument `quiet_start` is not supported"
assert replace_address is None or replace_node_host_id is None, \
"replace_address and replace_node_host_id cannot be specified together"
if self.is_running():
raise NodeError(f"{self.name} is already running")
scylla_args = self._process_scylla_args(
*(jvm_args or []),
*(["--replace-address", replace_address] if replace_address else []),
*(["--replace-node-first-boot", replace_node_host_id] if replace_node_host_id else []),
)
scylla_env = self._process_scylla_env()
marks = []
if wait_other_notice:
marks = [(node, node.mark_log()) for node in self.cluster.nodelist() if node.is_live()]
self.mark = self.mark_log()
self.cluster.manager.server_start(
server_id=self.server_id,
seeds=None if self.bootstrap else [self.address()],
expected_server_up_state=ServerUpState.PROCESS_STARTED,
cmdline_options_override=scylla_args,
append_env_override=scylla_env,
connect_driver=False,
)
if wait_for_binary_proto is None:
wait_for_binary_proto = self.cluster.force_wait_for_cluster_start and not no_wait
if wait_other_notice is None:
wait_other_notice = self.cluster.force_wait_for_cluster_start and not no_wait
if wait_normal_token_owner is None and wait_other_notice:
wait_normal_token_owner = True
if wait_for_binary_proto:
self.wait_for_binary_interface(from_mark=self.mark)
if wait_other_notice:
timeout = self.cluster.default_wait_other_notice_timeout
for node, mark in marks:
node.watch_log_for_alive(nodes=self, from_mark=mark, timeout=timeout)
node.watch_rest_for_alive(nodes=self, timeout=timeout, wait_normal_token_owner=wait_normal_token_owner)
self.watch_rest_for_alive(nodes=node, timeout=timeout, wait_normal_token_owner=wait_normal_token_owner)
def stop(self,
wait: bool = True,
wait_other_notice: bool = False,
other_nodes: list[ScyllaNode] | None = None,
gently: bool = True,
wait_seconds: int = 127,
marks: list[int] | None = None) -> bool:
if not self.is_running():
return False
if marks is None:
marks = [
(node, node.mark_log())
for node in other_nodes or self.cluster.nodelist()
if node.server_id != self.server_id and node.is_live()
] if wait_other_notice else []
if gently:
self.cluster.manager.server_stop_gracefully(server_id=self.server_id)
else:
self.cluster.manager.server_stop(server_id=self.server_id)
if wait or wait_other_notice:
self.wait_until_stopped(wait_seconds=wait_seconds, marks=marks, dump_core=gently)
return True
def nodetool(self,
cmd: str,
capture_output: bool = True,
wait: bool = True,
timeout: int | float | None = None,
verbose: bool = True) -> tuple[str, str]:
if capture_output and not wait:
raise ArgumentError("Cannot set capture_output while wait is False.")
nodetool_cmd = [
self.cluster.manager.server_get_exe(server_id=self.server_id),
"nodetool",
"-h",
str(self.cluster.manager.get_host_ip(server_id=self.server_id)),
*cmd.split(),
]
if verbose:
self.debug(f"nodetool cmd={nodetool_cmd} wait={wait} timeout={timeout}")
if capture_output:
p = subprocess.Popen(nodetool_cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(timeout=timeout)
else:
p = subprocess.Popen(nodetool_cmd, universal_newlines=True)
stdout, stderr = None, None
if wait and p.wait(timeout=timeout):
raise NodetoolError(" ".join(nodetool_cmd), p.returncode, stdout, stderr)
stderr = "\n".join(
line for line in stderr.splitlines()
if self.debug(f"checking {line}") or not any(p.fullmatch(line) for p in NODETOOL_STDERR_IGNORED_PATTERNS)
)
return stdout, stderr
def get_path(self) -> str:
"""Return the path to this node top level directory (where config/data is stored.)"""
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
def stress(self, stress_options: list[str], **kwargs):
cmd_args = ["cassandra-stress"] + stress_options
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
cmd_args.extend(["-node", self.address()])
p = subprocess.Popen(
cmd_args,
stdout=kwargs.pop("stdout", subprocess.PIPE),
stderr=kwargs.pop("stderr", subprocess.PIPE),
universal_newlines=True,
**kwargs,
)
try:
stdout, stderr = p.communicate()
if rc := p.returncode:
raise ToolError(cmd_args, rc, stdout, stderr)
ret = namedtuple("Subprocess_Return", "stdout stderr rc")
return ret(stdout=stdout, stderr=stderr, rc=rc)
except KeyboardInterrupt:
pass
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
cmd = ["flush"]
if ks:
cmd.append(ks)
if table:
cmd.append(table)
self.nodetool(" ".join(cmd), **kwargs)
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
compact_cmd = ["compact"]
if keyspace:
compact_cmd.append(keyspace)
compact_cmd += tables
self.nodetool(" ".join(compact_cmd))
def drain(self, block_on_log: bool = False) -> None:
mark = self.mark_log()
self.nodetool("drain")
if block_on_log:
self.watch_log_for("DRAINED", from_mark=mark)
def repair(self, options: list[str] | None = None, **kwargs) -> tuple[str, str]:
cmd = ["repair"]
if options:
cmd.extend(options)
return self.nodetool(" ".join(cmd), **kwargs)
def decommission(self) -> None:
self.cluster.manager.decommission_node(server_id=self.server_id)
def hostid(self, timeout: float | None = None, force_refresh: bool | None = None) -> str | None:
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
try:
return self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
def rmtree(self, path: str | Path) -> None:
"""Delete a directory content without removing the directory.
Copied this code from Python's documentation for Path.walk() method.
"""
for root, dirs, files in Path(path).walk(top_down=False):
for name in files:
(root / name).unlink()
for name in dirs:
(root / name).rmdir()
def _log_message(self, message: str) -> str:
return f"{self.name}: {message}"
def debug(self, message: str) -> None:
self.cluster.debug(self._log_message(message))
def info(self, message: str) -> None:
self.cluster.info(self._log_message(message))
def warning(self, message: str) -> None:
self.cluster.warning(self._log_message(message))
def error(self, message: str) -> None:
self.cluster.error(self._log_message(message))
def __repr__(self) -> str:
return f"<ScyllaNode name={self.server_id} dc={self.data_center} rack={self.rack}>"
def _parse_scylla_args(args: list[str]) -> dict[str, list[str]]:
parsed_args = {}
args = iter(args)
arg = next(args, None)
while arg is not None:
assert arg.startswith("-")
key, *value = arg.split(sep="=", maxsplit=1)
if value: # handle argument in `--foo=bar` form
arg = next(args, None)
else:
for arg in args: # handle argument in `--foo bar` form
if arg.startswith("-"):
break
value.append(arg)
else: # no more arguments
arg = None
if key.startswith("--scylla-manager"): # skip Scylla Manager arguments
continue
parsed_args.setdefault(key, []).append(" ".join(value))
return parsed_args
def _parse_size(s: str) -> int:
try:
factor = 1024 ** abs("k KMGT".index(s[-1]) - 1)
except ValueError:
return int(s)
return int(s[:-1]) * factor