mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 20:27:03 +00:00
In certain circumstances current way of collecting can be error prone. Collection can stop when the first file is skipped in the mode leaving the rest of the files in CLI not collected. Another issue that if the file specified twice, with directory and file explicitly, it will produce incorrect CppFile in the stash causing KeyError. Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1714
322 lines
11 KiB
Python
322 lines
11 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import pathlib
|
|
import shlex
|
|
import subprocess
|
|
from abc import ABC, abstractmethod
|
|
from functools import cached_property
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
from _pytest._code.code import ReprFileLocation
|
|
|
|
from scripts import coverage as coverage_script
|
|
from test import DEBUG_MODES, TEST_DIR, TOP_SRC_DIR, path_to
|
|
from test.pylib.resource_gather import get_resource_gather
|
|
from test.pylib.runner import BUILD_MODE, RUN_ID, TEST_SUITE
|
|
from test.pylib.scylla_cluster import merge_cmdline_options
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Iterator, Sequence
|
|
from typing import Any
|
|
|
|
from _pytest._code.code import TerminalRepr
|
|
from _pytest._io import TerminalWriter
|
|
|
|
|
|
UBSAN_OPTIONS = [
|
|
"halt_on_error=1",
|
|
"abort_on_error=1",
|
|
f"suppressions={TOP_SRC_DIR / 'ubsan-suppressions.supp'}",
|
|
os.getenv("UBSAN_OPTIONS"),
|
|
]
|
|
ASAN_OPTIONS = [
|
|
"disable_coredump=0",
|
|
"abort_on_error=1",
|
|
"detect_stack_use_after_return=1",
|
|
os.getenv("ASAN_OPTIONS"),
|
|
]
|
|
BASE_TEST_ENV = {
|
|
"UBSAN_OPTIONS": ":".join(filter(None, UBSAN_OPTIONS)),
|
|
"ASAN_OPTIONS": ":".join(filter(None, ASAN_OPTIONS)),
|
|
"SCYLLA_TEST_ENV": "yes",
|
|
}
|
|
|
|
DEFAULT_SCYLLA_ARGS = [
|
|
"--overprovisioned",
|
|
"--unsafe-bypass-fsync=1",
|
|
"--kernel-page-cache=1",
|
|
"--blocked-reactor-notify-ms=2000000",
|
|
"--collectd=0",
|
|
"--max-networking-io-control-blocks=1000",
|
|
]
|
|
DEFAULT_CUSTOM_ARGS = ["-c2 -m2G"]
|
|
|
|
TIMEOUT = 60 * 15 # seconds
|
|
TIMEOUT_DEBUG = 60 * 30 # seconds
|
|
|
|
|
|
class CppFile(pytest.File, ABC):
|
|
def __init__(self, **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
|
|
self.test_name = self.path.stem
|
|
|
|
# Implement following properties as cached_property because they are read-only, and based on stash items which
|
|
# will be assigned in test/pylib/runner.py::pytest_collect_file() hook after a CppFile instance was created.
|
|
|
|
@cached_property
|
|
def build_mode(self) -> str:
|
|
return self.stash[BUILD_MODE]
|
|
|
|
@cached_property
|
|
def run_id(self) -> int:
|
|
return self.stash[RUN_ID]
|
|
|
|
@cached_property
|
|
def suite_config(self) -> dict[str, Any]:
|
|
return self.stash[TEST_SUITE].cfg
|
|
|
|
@cached_property
|
|
def build_basedir(self) -> pathlib.Path:
|
|
return pathlib.Path(path_to(self.build_mode, "test", self.stash[TEST_SUITE].name))
|
|
|
|
@cached_property
|
|
def log_dir(self) -> pathlib.Path:
|
|
return pathlib.Path(self.config.getoption("--tmpdir")).joinpath(self.build_mode).absolute()
|
|
|
|
@cached_property
|
|
def exe_path(self) -> pathlib.Path:
|
|
return self.build_basedir / self.test_name
|
|
|
|
@abstractmethod
|
|
def list_test_cases(self) -> list[str]:
|
|
...
|
|
|
|
@abstractmethod
|
|
def run_test_case(self, test_case: CppTestCase) -> tuple[None | list[CppTestFailure], Path]:
|
|
...
|
|
|
|
@cached_property
|
|
def test_env(self) -> dict[str, str]:
|
|
variables = {
|
|
**BASE_TEST_ENV,
|
|
"TMPDIR": str(self.log_dir),
|
|
}
|
|
if self.build_mode == "coverage":
|
|
variables.update(coverage_script.env(self.exe_path))
|
|
return variables
|
|
|
|
@cached_property
|
|
def test_args(self) -> list[str]:
|
|
args = merge_cmdline_options(DEFAULT_SCYLLA_ARGS, self.suite_config.get("extra_scylla_cmdline_options", []))
|
|
if x_log2_compaction_groups := self.config.getoption("--x-log2-compaction-groups"):
|
|
if all_can_run_compaction_groups_except := self.suite_config.get("all_can_run_compaction_groups_except"):
|
|
if self.test_name not in all_can_run_compaction_groups_except:
|
|
args.append(f"--x-log2-compaction-groups={x_log2_compaction_groups}")
|
|
return args
|
|
|
|
def collect(self) -> Iterator[CppTestCase]:
|
|
if BUILD_MODE not in self.stash:
|
|
return
|
|
|
|
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
|
|
|
for test_case in self.list_test_cases():
|
|
if isinstance(test_case, list):
|
|
test_labels = test_case[1]
|
|
test_case = test_case[0]
|
|
else:
|
|
test_labels = []
|
|
# Start `index` from 1 if there are more than one custom_args item. This allows us to create
|
|
# test cases with unique names for each custom_args item and don't add any additional suffixes
|
|
# if there is only one item (in this case `index` is 0.)
|
|
for index, args in enumerate(custom_args, start=1 if len(custom_args) > 1 else 0):
|
|
yield CppTestCase.from_parent(
|
|
parent=self,
|
|
name=f"{test_case}.{index}" if index else test_case,
|
|
test_case_name=test_case,
|
|
test_custom_args=shlex.split(args),
|
|
own_markers=test_labels,
|
|
)
|
|
|
|
@classmethod
|
|
def pytest_collect_file(cls, file_path: pathlib.Path, parent: pytest.Collector) -> pytest.Collector | None:
|
|
if file_path.name.endswith("_test.cc"):
|
|
return cls.from_parent(parent=parent, path=file_path)
|
|
return None
|
|
|
|
|
|
class CppTestCase(pytest.Item):
|
|
parent: CppFile
|
|
|
|
def __init__(self, *, test_case_name: str, test_custom_args: list[str], own_markers: list[str] | set[str], **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
|
|
self.test_case_name = test_case_name
|
|
self.test_custom_args = test_custom_args
|
|
|
|
self.fixturenames = []
|
|
self.own_markers = [getattr(pytest.mark, mark_name) for mark_name in own_markers]
|
|
self.add_marker(pytest.mark.cpp)
|
|
|
|
def get_artifact_path(self, extra: str = "", suffix: str = "") -> pathlib.Path:
|
|
return self.parent.log_dir / ".".join(
|
|
(self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.parent.run_id}{suffix}").parts
|
|
)
|
|
|
|
def make_testpy_test_object_mock(self) -> SimpleNamespace:
|
|
"""Returns object that used in resource gathering.
|
|
|
|
It needed to not change the logic of writing metrics to DB that used in test types from test.py.
|
|
"""
|
|
return SimpleNamespace(
|
|
time_end=0,
|
|
time_start=0,
|
|
id=self.parent.run_id,
|
|
mode=self.parent.build_mode,
|
|
success=False,
|
|
shortname=self.name,
|
|
suite=SimpleNamespace(
|
|
log_dir=self.parent.log_dir,
|
|
name=self.parent.test_name,
|
|
),
|
|
)
|
|
|
|
def run_exe(self, test_args: list[str], output_file: pathlib.Path) -> subprocess.Popen[str]:
|
|
resource_gather = get_resource_gather(
|
|
is_switched_on=self.config.getoption("--gather-metrics"),
|
|
test=self.make_testpy_test_object_mock(),
|
|
)
|
|
resource_gather.make_cgroup()
|
|
process = resource_gather.run_process(
|
|
args=[self.parent.exe_path, *test_args, *self.test_custom_args],
|
|
timeout=TIMEOUT_DEBUG if self.parent.build_mode in DEBUG_MODES else TIMEOUT,
|
|
output_file=output_file,
|
|
cwd=TOP_SRC_DIR,
|
|
env=self.parent.test_env,
|
|
)
|
|
resource_gather.write_metrics_to_db(
|
|
metrics=resource_gather.get_test_metrics(),
|
|
success=process.returncode == 0,
|
|
)
|
|
resource_gather.remove_cgroup()
|
|
return process
|
|
|
|
def runtest(self) -> None:
|
|
failures, output = self.parent.run_test_case(test_case=self)
|
|
|
|
# Write output to stdout so pytest captures it for both terminal and JUnit report.
|
|
# Only show the last 300 lines to avoid excessive output.
|
|
lines = get_lines_from_end(output)
|
|
if lines:
|
|
print("\n" + "=" * 70)
|
|
print("C++ Test Output (last 300 lines):")
|
|
print("=" * 70)
|
|
print('\n'.join(lines))
|
|
print("=" * 70 + "\n")
|
|
|
|
if not self.config.getoption("--save-log-on-success"):
|
|
output.unlink(missing_ok=True)
|
|
|
|
if failures:
|
|
raise CppTestFailureList(failures)
|
|
|
|
def repr_failure(self,
|
|
excinfo: pytest.ExceptionInfo[BaseException | CppTestFailureList],
|
|
**kwargs: Any) -> str | TerminalRepr | CppFailureRepr:
|
|
if isinstance(excinfo.value, CppTestFailureList):
|
|
return CppFailureRepr(excinfo.value.failures)
|
|
return pytest.Item.repr_failure(self, excinfo)
|
|
|
|
def reportinfo(self) -> tuple[Any, int, str]:
|
|
return self.path, 0, self.test_case_name
|
|
|
|
|
|
class CppTestFailure(Exception):
|
|
def __init__(self, file_name: str, line_num: int, content: str) -> None:
|
|
self.file_name = file_name
|
|
self.line_num = line_num
|
|
self.lines = content.splitlines()
|
|
|
|
def get_lines(self) -> list[tuple[str, tuple[str, ...]]]:
|
|
m = ("red", "bold")
|
|
return [(x, m) for x in self.lines]
|
|
|
|
def get_file_reference(self) -> tuple[str, int]:
|
|
return self.file_name, self.line_num
|
|
|
|
|
|
class CppTestFailureList(Exception):
|
|
def __init__(self, failures: Sequence[CppTestFailure]) -> None:
|
|
self.failures = list(failures)
|
|
|
|
|
|
class CppFailureRepr:
|
|
failure_sep = "---"
|
|
|
|
def __init__(self, failures: Sequence[CppTestFailure]) -> None:
|
|
self.failures = failures
|
|
|
|
def __str__(self) -> str:
|
|
reprs = []
|
|
for failure in self.failures:
|
|
pure_lines = "\n".join(x[0] for x in failure.get_lines())
|
|
repr_loc = self._get_repr_file_location(failure)
|
|
reprs.append("%s\n%s" % (pure_lines, repr_loc))
|
|
return self.failure_sep.join(reprs)
|
|
|
|
@staticmethod
|
|
def _get_repr_file_location(failure: CppTestFailure) -> ReprFileLocation:
|
|
filename, line_num = failure.get_file_reference()
|
|
return ReprFileLocation(path=filename, lineno=line_num, message="C++ failure")
|
|
|
|
def toterminal(self, tw: TerminalWriter) -> None:
|
|
for index, failure in enumerate(self.failures):
|
|
for line, markup in failure.get_lines():
|
|
markup_params = {m: True for m in markup}
|
|
tw.line(line, **markup_params)
|
|
|
|
location = self._get_repr_file_location(failure)
|
|
location.toterminal(tw)
|
|
|
|
if index != len(self.failures) - 1:
|
|
tw.line(self.failure_sep, cyan=True)
|
|
|
|
|
|
def get_lines_from_end(file_path: pathlib.Path, lines_count: int = 300) -> list[str]:
|
|
"""
|
|
Seeks to the end of the file and reads backwards to find the last N lines
|
|
without iterating over the whole file.
|
|
"""
|
|
chunk_size = 8192 # 8KB chunks
|
|
buffer = ""
|
|
|
|
with file_path.open("rb") as f:
|
|
f.seek(0, os.SEEK_END)
|
|
file_size = f.tell()
|
|
pointer = file_size
|
|
|
|
while pointer > 0:
|
|
# Read one chunk backwards
|
|
pointer -= min(pointer, chunk_size)
|
|
f.seek(pointer)
|
|
chunk = f.read(min(file_size - pointer, chunk_size)).decode('utf-8', errors='ignore')
|
|
buffer = chunk + buffer
|
|
|
|
# Stop once we have enough lines
|
|
if len(buffer.splitlines()) > lines_count:
|
|
break
|
|
|
|
# Return only the requested number of lines
|
|
return buffer.splitlines()[-lines_count:]
|