Currently, if the test fail, pytest will output only some basic information about the fail. With this change, it will output the last 300 lines of the boost/seastar test output. Also add capturing the output of the failed tests to JUnit report, so it will be present in the report on Jenkins. Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-449 Closes scylladb/scylladb#28535
319 lines
11 KiB
Python
319 lines
11 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import pathlib
|
|
import shlex
|
|
import subprocess
|
|
from abc import ABC, abstractmethod
|
|
from functools import cached_property
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
from _pytest._code.code import ReprFileLocation
|
|
|
|
from scripts import coverage as coverage_script
|
|
from test import DEBUG_MODES, TEST_DIR, TOP_SRC_DIR, path_to
|
|
from test.pylib.resource_gather import get_resource_gather
|
|
from test.pylib.runner import BUILD_MODE, RUN_ID, TEST_SUITE
|
|
from test.pylib.scylla_cluster import merge_cmdline_options
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Iterator, Sequence
|
|
from typing import Any
|
|
|
|
from _pytest._code.code import TerminalRepr
|
|
from _pytest._io import TerminalWriter
|
|
|
|
|
|
UBSAN_OPTIONS = [
|
|
"halt_on_error=1",
|
|
"abort_on_error=1",
|
|
f"suppressions={TOP_SRC_DIR / 'ubsan-suppressions.supp'}",
|
|
os.getenv("UBSAN_OPTIONS"),
|
|
]
|
|
ASAN_OPTIONS = [
|
|
"disable_coredump=0",
|
|
"abort_on_error=1",
|
|
"detect_stack_use_after_return=1",
|
|
os.getenv("ASAN_OPTIONS"),
|
|
]
|
|
BASE_TEST_ENV = {
|
|
"UBSAN_OPTIONS": ":".join(filter(None, UBSAN_OPTIONS)),
|
|
"ASAN_OPTIONS": ":".join(filter(None, ASAN_OPTIONS)),
|
|
"SCYLLA_TEST_ENV": "yes",
|
|
}
|
|
|
|
DEFAULT_SCYLLA_ARGS = [
|
|
"--overprovisioned",
|
|
"--unsafe-bypass-fsync=1",
|
|
"--kernel-page-cache=1",
|
|
"--blocked-reactor-notify-ms=2000000",
|
|
"--collectd=0",
|
|
"--max-networking-io-control-blocks=1000",
|
|
]
|
|
DEFAULT_CUSTOM_ARGS = ["-c2 -m2G"]
|
|
|
|
TIMEOUT = 60 * 15 # seconds
|
|
TIMEOUT_DEBUG = 60 * 30 # seconds
|
|
|
|
|
|
class CppFile(pytest.File, ABC):
|
|
def __init__(self, **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
|
|
self.test_name = self.path.stem
|
|
|
|
# Implement following properties as cached_property because they are read-only, and based on stash items which
|
|
# will be assigned in test/pylib/runner.py::pytest_collect_file() hook after a CppFile instance was created.
|
|
|
|
@cached_property
|
|
def build_mode(self) -> str:
|
|
return self.stash[BUILD_MODE]
|
|
|
|
@cached_property
|
|
def run_id(self) -> int:
|
|
return self.stash[RUN_ID]
|
|
|
|
@cached_property
|
|
def suite_config(self) -> dict[str, Any]:
|
|
return self.stash[TEST_SUITE].cfg
|
|
|
|
@cached_property
|
|
def build_basedir(self) -> pathlib.Path:
|
|
return pathlib.Path(path_to(self.build_mode, "test", self.stash[TEST_SUITE].name))
|
|
|
|
@cached_property
|
|
def log_dir(self) -> pathlib.Path:
|
|
return pathlib.Path(self.config.getoption("--tmpdir")).joinpath(self.build_mode).absolute()
|
|
|
|
@cached_property
|
|
def exe_path(self) -> pathlib.Path:
|
|
return self.build_basedir / self.test_name
|
|
|
|
@abstractmethod
|
|
def list_test_cases(self) -> list[str]:
|
|
...
|
|
|
|
@abstractmethod
|
|
def run_test_case(self, test_case: CppTestCase) -> tuple[None | list[CppTestFailure], Path]:
|
|
...
|
|
|
|
@cached_property
|
|
def test_env(self) -> dict[str, str]:
|
|
variables = {
|
|
**BASE_TEST_ENV,
|
|
"TMPDIR": str(self.log_dir),
|
|
}
|
|
if self.build_mode == "coverage":
|
|
variables.update(coverage_script.env(self.exe_path))
|
|
return variables
|
|
|
|
@cached_property
|
|
def test_args(self) -> list[str]:
|
|
args = merge_cmdline_options(DEFAULT_SCYLLA_ARGS, self.suite_config.get("extra_scylla_cmdline_options", []))
|
|
if x_log2_compaction_groups := self.config.getoption("--x-log2-compaction-groups"):
|
|
if all_can_run_compaction_groups_except := self.suite_config.get("all_can_run_compaction_groups_except"):
|
|
if self.test_name not in all_can_run_compaction_groups_except:
|
|
args.append(f"--x-log2-compaction-groups={x_log2_compaction_groups}")
|
|
return args
|
|
|
|
def collect(self) -> Iterator[CppTestCase]:
|
|
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
|
|
|
for test_case in self.list_test_cases():
|
|
if isinstance(test_case, list):
|
|
test_labels = test_case[1]
|
|
test_case = test_case[0]
|
|
else:
|
|
test_labels = []
|
|
# Start `index` from 1 if there are more than one custom_args item. This allows us to create
|
|
# test cases with unique names for each custom_args item and don't add any additional suffixes
|
|
# if there is only one item (in this case `index` is 0.)
|
|
for index, args in enumerate(custom_args, start=1 if len(custom_args) > 1 else 0):
|
|
yield CppTestCase.from_parent(
|
|
parent=self,
|
|
name=f"{test_case}.{index}" if index else test_case,
|
|
test_case_name=test_case,
|
|
test_custom_args=shlex.split(args),
|
|
own_markers=test_labels,
|
|
)
|
|
|
|
@classmethod
|
|
def pytest_collect_file(cls, file_path: pathlib.Path, parent: pytest.Collector) -> pytest.Collector | None:
|
|
if file_path.name.endswith("_test.cc"):
|
|
return cls.from_parent(parent=parent, path=file_path)
|
|
return None
|
|
|
|
|
|
class CppTestCase(pytest.Item):
|
|
parent: CppFile
|
|
|
|
def __init__(self, *, test_case_name: str, test_custom_args: list[str], own_markers: list[str] | set[str], **kwargs: Any):
|
|
super().__init__(**kwargs)
|
|
|
|
self.test_case_name = test_case_name
|
|
self.test_custom_args = test_custom_args
|
|
|
|
self.fixturenames = []
|
|
self.own_markers = [getattr(pytest.mark, mark_name) for mark_name in own_markers]
|
|
self.add_marker(pytest.mark.cpp)
|
|
|
|
def get_artifact_path(self, extra: str = "", suffix: str = "") -> pathlib.Path:
|
|
return self.parent.log_dir / ".".join(
|
|
(self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.parent.run_id}{suffix}").parts
|
|
)
|
|
|
|
def make_testpy_test_object_mock(self) -> SimpleNamespace:
|
|
"""Returns object that used in resource gathering.
|
|
|
|
It needed to not change the logic of writing metrics to DB that used in test types from test.py.
|
|
"""
|
|
return SimpleNamespace(
|
|
time_end=0,
|
|
time_start=0,
|
|
id=self.parent.run_id,
|
|
mode=self.parent.build_mode,
|
|
success=False,
|
|
shortname=self.name,
|
|
suite=SimpleNamespace(
|
|
log_dir=self.parent.log_dir,
|
|
name=self.parent.test_name,
|
|
),
|
|
)
|
|
|
|
def run_exe(self, test_args: list[str], output_file: pathlib.Path) -> subprocess.Popen[str]:
|
|
resource_gather = get_resource_gather(
|
|
is_switched_on=self.config.getoption("--gather-metrics"),
|
|
test=self.make_testpy_test_object_mock(),
|
|
)
|
|
resource_gather.make_cgroup()
|
|
process = resource_gather.run_process(
|
|
args=[self.parent.exe_path, *test_args, *self.test_custom_args],
|
|
timeout=TIMEOUT_DEBUG if self.parent.build_mode in DEBUG_MODES else TIMEOUT,
|
|
output_file=output_file,
|
|
cwd=TOP_SRC_DIR,
|
|
env=self.parent.test_env,
|
|
)
|
|
resource_gather.write_metrics_to_db(
|
|
metrics=resource_gather.get_test_metrics(),
|
|
success=process.returncode == 0,
|
|
)
|
|
resource_gather.remove_cgroup()
|
|
return process
|
|
|
|
def runtest(self) -> None:
|
|
failures, output = self.parent.run_test_case(test_case=self)
|
|
|
|
# Write output to stdout so pytest captures it for both terminal and JUnit report.
|
|
# Only show the last 300 lines to avoid excessive output.
|
|
lines = get_lines_from_end(output)
|
|
if lines:
|
|
print("\n" + "=" * 70)
|
|
print("C++ Test Output (last 300 lines):")
|
|
print("=" * 70)
|
|
print('\n'.join(lines))
|
|
print("=" * 70 + "\n")
|
|
|
|
if not self.config.getoption("--save-log-on-success"):
|
|
output.unlink(missing_ok=True)
|
|
|
|
if failures:
|
|
raise CppTestFailureList(failures)
|
|
|
|
def repr_failure(self,
|
|
excinfo: pytest.ExceptionInfo[BaseException | CppTestFailureList],
|
|
**kwargs: Any) -> str | TerminalRepr | CppFailureRepr:
|
|
if isinstance(excinfo.value, CppTestFailureList):
|
|
return CppFailureRepr(excinfo.value.failures)
|
|
return pytest.Item.repr_failure(self, excinfo)
|
|
|
|
def reportinfo(self) -> tuple[Any, int, str]:
|
|
return self.path, 0, self.test_case_name
|
|
|
|
|
|
class CppTestFailure(Exception):
|
|
def __init__(self, file_name: str, line_num: int, content: str) -> None:
|
|
self.file_name = file_name
|
|
self.line_num = line_num
|
|
self.lines = content.splitlines()
|
|
|
|
def get_lines(self) -> list[tuple[str, tuple[str, ...]]]:
|
|
m = ("red", "bold")
|
|
return [(x, m) for x in self.lines]
|
|
|
|
def get_file_reference(self) -> tuple[str, int]:
|
|
return self.file_name, self.line_num
|
|
|
|
|
|
class CppTestFailureList(Exception):
|
|
def __init__(self, failures: Sequence[CppTestFailure]) -> None:
|
|
self.failures = list(failures)
|
|
|
|
|
|
class CppFailureRepr:
|
|
failure_sep = "---"
|
|
|
|
def __init__(self, failures: Sequence[CppTestFailure]) -> None:
|
|
self.failures = failures
|
|
|
|
def __str__(self) -> str:
|
|
reprs = []
|
|
for failure in self.failures:
|
|
pure_lines = "\n".join(x[0] for x in failure.get_lines())
|
|
repr_loc = self._get_repr_file_location(failure)
|
|
reprs.append("%s\n%s" % (pure_lines, repr_loc))
|
|
return self.failure_sep.join(reprs)
|
|
|
|
@staticmethod
|
|
def _get_repr_file_location(failure: CppTestFailure) -> ReprFileLocation:
|
|
filename, line_num = failure.get_file_reference()
|
|
return ReprFileLocation(path=filename, lineno=line_num, message="C++ failure")
|
|
|
|
def toterminal(self, tw: TerminalWriter) -> None:
|
|
for index, failure in enumerate(self.failures):
|
|
for line, markup in failure.get_lines():
|
|
markup_params = {m: True for m in markup}
|
|
tw.line(line, **markup_params)
|
|
|
|
location = self._get_repr_file_location(failure)
|
|
location.toterminal(tw)
|
|
|
|
if index != len(self.failures) - 1:
|
|
tw.line(self.failure_sep, cyan=True)
|
|
|
|
|
|
def get_lines_from_end(file_path: pathlib.Path, lines_count: int = 300) -> list[str]:
|
|
"""
|
|
Seeks to the end of the file and reads backwards to find the last N lines
|
|
without iterating over the whole file.
|
|
"""
|
|
chunk_size = 8192 # 8KB chunks
|
|
buffer = ""
|
|
|
|
with file_path.open("rb") as f:
|
|
f.seek(0, os.SEEK_END)
|
|
file_size = f.tell()
|
|
pointer = file_size
|
|
|
|
while pointer > 0:
|
|
# Read one chunk backwards
|
|
pointer -= min(pointer, chunk_size)
|
|
f.seek(pointer)
|
|
chunk = f.read(min(file_size - pointer, chunk_size)).decode('utf-8', errors='ignore')
|
|
buffer = chunk + buffer
|
|
|
|
# Stop once we have enough lines
|
|
if len(buffer.splitlines()) > lines_count:
|
|
break
|
|
|
|
# Return only the requested number of lines
|
|
return buffer.splitlines()[-lines_count:]
|