Currently, if the test fail, pytest will output only some basic information about the fail. With this change, it will output the last 300 lines of the boost/seastar test output. Also add capturing the output of the failed tests to JUnit report, so it will be present in the report on Jenkins. Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-449 Closes scylladb/scylladb#28535
273 lines
9.7 KiB
Python
273 lines
9.7 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import logging
|
|
import subprocess
|
|
import tempfile
|
|
import pathlib
|
|
import json
|
|
from functools import cache, cached_property
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from textwrap import dedent
|
|
from typing import TYPE_CHECKING
|
|
from xml.etree import ElementTree
|
|
|
|
import allure
|
|
|
|
from test.pylib.cpp.base import CppFile, CppTestFailure
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
from test.pylib.cpp.base import CppTestCase
|
|
|
|
|
|
COMBINED_TESTS = "combined_tests"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BoostTestFile(CppFile):
|
|
@cached_property
|
|
def exe_path(self) -> pathlib.Path:
|
|
path = super().exe_path
|
|
if not path.is_file():
|
|
path = self.build_basedir / COMBINED_TESTS
|
|
if not path.is_file() or self.test_name not in get_boost_test_list_content(executable=path, combined=True):
|
|
raise FileNotFoundError(
|
|
f"There is no separate {self.build_mode} binary built for {self.path.name},"
|
|
" and it's not built into the combined tests binary",
|
|
)
|
|
return path
|
|
|
|
@cached_property
|
|
def combined(self) -> bool:
|
|
return self.exe_path.name == COMBINED_TESTS
|
|
|
|
@cached_property
|
|
def no_parallel(self) -> bool:
|
|
"""Run all test cases in a single process."""
|
|
|
|
return self.test_name in self.suite_config.get("no_parallel_cases", [])
|
|
|
|
def list_test_cases(self) -> list[str]:
|
|
if self.no_parallel:
|
|
return [self.test_name]
|
|
return get_boost_test_list_json_content(executable=self.exe_path,combined=self.combined).get(self.test_name, [])
|
|
|
|
def run_test_case(self, test_case: CppTestCase) -> tuple[list[CppTestFailure], Path] | tuple[None, Path]:
|
|
run_test = f"{self.test_name}/{test_case.test_case_name}" if self.combined else test_case.test_case_name
|
|
|
|
log_sink = tempfile.NamedTemporaryFile(mode="w+t")
|
|
args = [
|
|
"--logger=HRF,warning,stdout",
|
|
f"--logger=XML,warning,{log_sink.name}",
|
|
"--catch_system_errors=no",
|
|
"--color_output=false",
|
|
]
|
|
if not self.no_parallel:
|
|
args.append(f"--{run_test=}")
|
|
|
|
args.append("--")
|
|
|
|
if random_seed := self.config.getoption("--random-seed"):
|
|
args.append(f"--random-seed={random_seed}")
|
|
args.extend(self.test_args)
|
|
|
|
stdout_file_path = test_case.get_artifact_path(extra="_stdout", suffix=".log").absolute()
|
|
process = test_case.run_exe(test_args=args, output_file=stdout_file_path)
|
|
|
|
try:
|
|
log_xml = pathlib.Path(log_sink.name).read_text(encoding="utf-8")
|
|
except IOError:
|
|
log_xml = ""
|
|
finally:
|
|
log_sink.close()
|
|
results = parse_boost_test_log_sink(log_xml=log_xml)
|
|
|
|
if return_code := process.returncode:
|
|
allure.attach(stdout_file_path.read_bytes(), name="output", attachment_type=allure.attachment_type.TEXT)
|
|
return [CppTestFailure(
|
|
file_name=results[0].file_name if results else self.path.name,
|
|
line_num=results[0].line_num if results else -1,
|
|
content=dedent(f"""\
|
|
working_dir: {os.getcwd()}
|
|
Internal Error: calling {self.exe_path} for test {run_test} failed ({return_code=}):
|
|
output file: {stdout_file_path}
|
|
command to repeat: {subprocess.list2cmdline(process.args)}
|
|
error: {results[0].lines if results else 'unknown'}
|
|
"""),
|
|
)], stdout_file_path
|
|
|
|
return None, stdout_file_path
|
|
|
|
|
|
pytest_collect_file = BoostTestFile.pytest_collect_file
|
|
|
|
@cache
|
|
def get_boost_test_list_json_content(executable: pathlib.Path, combined: bool = False)-> dict[str, list[list[str, set[str]]]]:
|
|
"""
|
|
mimic get_boost_test_list_content but using --list_json_content which provides more structured data including test labels
|
|
|
|
List the content of test tree in an executable.
|
|
Return a dict where key is the name of test file and value is a list of tests in this file with their labels.
|
|
In case of combined tests the dict will have multiple items, otherwise we assume that name of the executable is the same
|
|
as the source test file (.cc)
|
|
"""
|
|
try:
|
|
output = subprocess.check_output(
|
|
[executable, "--","--list_json_content"],
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True,
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
if "Test setup error: test tree is empty" in e.output:
|
|
return {executable.name: []}
|
|
raise e
|
|
|
|
data = json.loads(output)
|
|
test_tree = {}
|
|
|
|
def parse_suite(key, suite, suite_str=""):
|
|
for _suite in suite["suites"]:
|
|
parse_suite(key, _suite, f'{suite_str}/{_suite["name"]}')
|
|
for test in suite["tests"]:
|
|
if test["name"].startswith("_"):
|
|
test_tree[key].append([suite["name"], []])
|
|
break
|
|
test_name = f'{suite_str}/{test["name"]}' if suite_str else test["name"]
|
|
labels = set(test["labels"].split(",")) if "labels" in test and test["labels"] else set()
|
|
test_tree[key].append([test_name, labels])
|
|
|
|
for file in data:
|
|
for s in file["content"]["suites"]:
|
|
k = s["name"] if combined else executable.name
|
|
if k not in test_tree:
|
|
test_tree[k] = []
|
|
parse_suite(k, s, suite_str=s["name"] if not combined else "")
|
|
if file["content"]["tests"]:
|
|
if executable.name not in test_tree:
|
|
test_tree[executable.name] = []
|
|
for test in file["content"]["tests"]:
|
|
labels = set(test["labels"].split(",")) if "labels" in test and test["labels"] else set()
|
|
test_tree[executable.name].append([test["name"], labels])
|
|
return test_tree
|
|
|
|
@cache
|
|
def get_boost_test_list_content(executable: pathlib.Path, combined: bool = False) -> dict[str, list[str]]:
|
|
"""List the content of test tree in an executable.
|
|
|
|
Return a dict where key is the name of test file and value is a list of tests in this file. In case of
|
|
combined tests the dict will have multiple items, otherwise we assume that name of the executable is the same
|
|
as the source test file (.cc)
|
|
|
|
--list_content produces the list of all test cases in the file. When BOOST_DATA_TEST_CASE is used it
|
|
additionally produce the lines with numbers for each case preserving the function name like this:
|
|
|
|
test_singular_tree_ptr_sz*
|
|
_0*
|
|
_1*
|
|
_2*
|
|
|
|
or, in case of combined tests:
|
|
|
|
group0_voter_calculator_test*
|
|
existing_voters_are_kept_across_racks*
|
|
leader_is_retained_as_voter*
|
|
_0*
|
|
_1*
|
|
_2*
|
|
|
|
Lines like '_0' are ignored because we count a test with a dataprovider as one test case.
|
|
"""
|
|
output = subprocess.check_output(
|
|
[executable, "--list_content"],
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True,
|
|
)
|
|
|
|
current_suite = {}
|
|
stack = []
|
|
root = current_suite
|
|
|
|
# In either combined tests, or tests with suites, we have
|
|
# indentation signifying the suite group. These can even be
|
|
# nested, so build a tree of dicts.
|
|
indent = 0
|
|
last = ''
|
|
for line in output.splitlines():
|
|
name = line.strip().removesuffix("*") # remove spaces around and trailing `*`
|
|
|
|
if name.startswith("_"):
|
|
# TODO: add support for test cases with dataprovider
|
|
continue
|
|
n = next((i for i, c in enumerate(line) if c != ' '), len(line))
|
|
if n > indent:
|
|
# last was a test suite
|
|
stack.append((current_suite, indent))
|
|
current_suite = current_suite[last]
|
|
elif n < indent:
|
|
# end of a suite
|
|
while True:
|
|
current_suite, level = stack.pop()
|
|
if level == n:
|
|
break
|
|
|
|
indent = n
|
|
current_suite[name] = {}
|
|
last = name
|
|
|
|
# reduce a dict to a list of the test paths flattened
|
|
# to <suite>/<suite>/<case>
|
|
def flatten(tests: dict[str, dict]) -> list[str]:
|
|
res = []
|
|
for s, v in tests.items():
|
|
if v:
|
|
sub = flatten(v)
|
|
res = res + ["/".join([s, vv]) for vv in sub]
|
|
else:
|
|
res.append(s)
|
|
return res
|
|
|
|
test_tree = {}
|
|
|
|
if combined:
|
|
# if combined, we keep each top level suite
|
|
test_tree = { k: flatten(v) for k, v in root.items() }
|
|
else:
|
|
# else flatten suites to designators and store under exec
|
|
test_tree = { executable.name : flatten(root) }
|
|
|
|
return test_tree
|
|
|
|
def parse_boost_test_log_sink(log_xml: str) -> list[CppTestFailure]:
|
|
"""Parse the output of 'log' section produced by BoostTest.
|
|
|
|
This is always an XML file, and from this it's possible to parse most of the failures
|
|
possible when running BoostTest.
|
|
"""
|
|
result = []
|
|
|
|
try:
|
|
log_root = ElementTree.fromstring(text=log_xml)
|
|
if log_root is not None:
|
|
for elem in chain.from_iterable(log_root.findall(tag) for tag in ("Exception", "Error", "FatalError")):
|
|
last_checkpoint = elem.find("LastCheckpoint")
|
|
if last_checkpoint is not None:
|
|
elem = last_checkpoint
|
|
result.append(CppTestFailure(
|
|
file_name=elem.attrib["file"],
|
|
line_num=int(elem.attrib["line"]),
|
|
content=elem.text or "",
|
|
))
|
|
except ElementTree.ParseError:
|
|
logger.warning("Error parsing the log_sink output. Can be empty or invalid.")
|
|
|
|
return result
|