Merge 'test.py: topology: allow to run tests with bare pytest command' from Evgeniy Naydanov

Add possibility to run topology tests using bare pytest command.

To achieve this goal the following changes were made:

- Add fixtures `testpy_testsuite` and `testpy_test` to `test/conftest.py`.
- To build `TestSuite` object we need to discover a corresponding `suite.xml` file.  Do this by walking up thru the fs tree starting from the current test file.
- Run ScyllaClusterManager using pytest fixture if `--manager-api` option is not provided.

And made some refactoring:

- Add path constants to `test` module and use them in different test suites instead of own dups of the same code:
  - TOP_SRC_DIR : ScyllaDB's source code root directory
  - TEST_DIR : the directory with test.py tests and libs
  - BUILD_DIR : directory with ScyllaDB's build artifacts
- Add TestSuite.log_dir attribute as a ScyllaDB's build mode subdir of a path provided using `--tmpdir` CLI argument. Don't use `tmpdir` name because it mixed up with pytest's built-in fixture and `--tmpdir` option itself.
- Change default value for `--tmdir` from `./testlog` to `TOP_SRC_DIR/testlog`
- Refactor `ResourceGather*` classes to use path from a `test` object instead of providing it separately.
- Move modes constants (`all_modes`/`ALL_MODES` and `debug_modes`/`DEBUG_MODES`) to `test` module and remove duplication.
- Move `prepare_dirs()` and `start_3rd_party_services()` from `pylib.util` to`pylib.suite.base` to avoid circular imports.
- In some places refactor to use f-strings for formatting.

Also minor changes related to running with pytest-xdist:

- When run tests in parallel we need to ensure that filenames are unique by adding xdist worker ID to them.
- Pass random seed across xdist workers using env variable.

Closes scylladb/scylladb#22960

* github.com:scylladb/scylladb:
  test.py: async_cql: remove unused event_loop fixture
  test.py: random_failures: make it play well with xdist
  test.py: add xdist worker ID to log filenames
  test.py: topology: run tests using bare pytest command
  test.py: add fixtures for current test suite and test
  test.py: refactor paths constants and options
This commit is contained in:
Pavel Emelyanov
2025-03-31 09:30:06 +03:00
23 changed files with 426 additions and 298 deletions

26
test.py
View File

@@ -34,11 +34,20 @@ import humanfriendly
import treelib
from scripts import coverage
from test import ALL_MODES, TOP_SRC_DIR, path_to
from test.pylib import coverage_utils
from test.pylib.suite.base import Test, TestSuite, all_modes, init_testsuite_globals, output_is_a_tty, palette, path_to
from test.pylib.suite.base import (
Test,
TestSuite,
init_testsuite_globals,
output_is_a_tty,
palette,
prepare_dirs,
start_3rd_party_services,
)
from test.pylib.suite.boost import BoostTest
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher
from test.pylib.util import LogPrefixAdapter, get_configured_modes, ninja, prepare_dirs, start_3rd_party_services
from test.pylib.util import LogPrefixAdapter, get_configured_modes, ninja
if TYPE_CHECKING:
from typing import List
@@ -127,16 +136,11 @@ def parse_cmd_line() -> argparse.Namespace:
"boost/memtable_test::test_hash_is_cached" to narrow down to
a certain test case. Default: run all tests in all suites.""",
)
parser.add_argument(
"--tmpdir",
action="store",
default="testlog",
help="""Path to temporary test data and log files. The data is
further segregated per build mode. Default: ./testlog.""",
)
parser.add_argument("--tmpdir", action="store", default=str(TOP_SRC_DIR / "testlog"),
help="Path to temporary test data and log files. The data is further segregated per build mode.")
parser.add_argument("--gather-metrics", action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("--max-failures", type=int, default=-1, help="Maximum number of failures to tolerate before cancelling rest of tests.")
parser.add_argument('--mode', choices=all_modes.keys(), action="append", dest="modes",
parser.add_argument('--mode', choices=ALL_MODES, action="append", dest="modes",
help="Run only tests for given build mode(s)")
parser.add_argument('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution")
@@ -247,7 +251,7 @@ def parse_cmd_line() -> argparse.Namespace:
args.coverage = True
args.tmpdir = os.path.abspath(args.tmpdir)
prepare_dirs(tempdir_base=args.tmpdir, modes=args.modes)
prepare_dirs(tempdir_base=pathlib.Path(args.tmpdir), modes=args.modes)
# Get the list of tests configured by configure.py
try:

View File

@@ -3,9 +3,33 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import os
from pathlib import Path
__all__ = ["ALL_MODES", "BUILD_DIR", "DEBUG_MODES", "TEST_DIR", "TEST_RUNNER", "TOP_SRC_DIR", "path_to"]
TEST_RUNNER = os.environ.get("SCYLLA_TEST_RUNNER", "pytest")
TOP_SRC_DIR = Path(__file__).parent.parent # ScyllaDB's source code root directory
TOP_SRC_DIR = Path(__file__).parent.parent # ScyllaDB's source code root directory
TEST_DIR = TOP_SRC_DIR / "test"
BUILD_DIR = TOP_SRC_DIR / "build"
ALL_MODES = {
"debug": "Debug",
'release': "RelWithDebInfo",
"dev": "Dev",
"sanitize": "Sanitize",
"coverage": "Coverage",
}
DEBUG_MODES = {"debug", "sanitize"}
def path_to(mode: str, *components: str) -> str:
"""Resolve path to built executable."""
if BUILD_DIR.joinpath("build.ninja").exists():
*dir_components, basename = components
return str(BUILD_DIR.joinpath(*dir_components, ALL_MODES[mode], basename))
return str(BUILD_DIR.joinpath(mode, *components))

View File

@@ -19,4 +19,4 @@ def pytest_collect_file(file_path: PosixPath, parent: Collector):
# One of the files in the directory has additional extensions .inc. It's not a test and will not have a binary for
# execution, so it should be excluded from collecting
if file_path.suffix == '.cc' and '.inc' not in file_path.suffixes and file_path.stem != COMBINED_TESTS.stem:
return collect_items(file_path, parent, facade=BoostTestFacade(parent.config, get_combined_tests(parent.session)))
return collect_items(file_path, parent, facade=BoostTestFacade(parent.config, get_combined_tests()))

View File

@@ -5,16 +5,21 @@
#
# This file configures pytest for all tests in this directory, and also
# defines common test fixtures for all of them to use
import pathlib
from __future__ import annotations
import asyncio
import ssl
import tempfile
import platform
import urllib.parse
from functools import partial
from typing import List, Optional, Dict
from multiprocessing import Event, Process
from typing import TYPE_CHECKING
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name
from test.pylib.manager_client import ManagerClient, IPAddress
from test.pylib.async_cql import event_loop, run_async
from test.pylib.manager_client import ManagerClient
from test.pylib.async_cql import run_async
from test.pylib.scylla_cluster import ScyllaClusterManager
import logging
import pytest
from cassandra.auth import PlainTextAuthProvider # type: ignore # pylint: disable=no-name-in-module
@@ -27,7 +32,15 @@ from cassandra.policies import TokenAwarePolicy # type:
from cassandra.policies import WhiteListRoundRobinPolicy # type: ignore
from cassandra.connection import DRIVER_NAME # type: ignore # pylint: disable=no-name-in-module
from cassandra.connection import DRIVER_VERSION # type: ignore # pylint: disable=no-name-in-module
from cassandra.connection import EndPoint # type: ignore # pylint: disable=no-name-in-module
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from typing import Callable
from cassandra.connection import EndPoint
from test.pylib.internal_types import IPAddress
from test.pylib.suite.base import Test
Session.run_async = run_async # patch Session for convenience
@@ -39,7 +52,7 @@ print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
def pytest_addoption(parser):
parser.addoption('--manager-api', action='store', required=True,
parser.addoption('--manager-api', action='store',
help='Manager unix socket path')
parser.addoption('--host', action='store', default='localhost',
help='CQL server host to connect to')
@@ -58,7 +71,7 @@ def pytest_addoption(parser):
# This is a constant used in `pytest_runtest_makereport` below to store the full report for the test case
# in a stash which can then be accessed from fixtures to print the stacktrace for the failed test
PHASE_REPORT_KEY = pytest.StashKey[Dict[str, pytest.CollectReport]]()
PHASE_REPORT_KEY = pytest.StashKey[dict[str, pytest.CollectReport]]()
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
@@ -90,7 +103,7 @@ class CustomConnection(Cluster.connection_class):
# cluster_con helper: set up client object for communicating with the CQL API.
def cluster_con(hosts: List[IPAddress | EndPoint], port: int, use_ssl: bool, auth_provider=None, load_balancing_policy=RoundRobinPolicy()):
def cluster_con(hosts: list[IPAddress | EndPoint], port: int, use_ssl: bool, auth_provider=None, load_balancing_policy=RoundRobinPolicy()):
"""Create a CQL Cluster connection object according to configuration.
It does not .connect() yet."""
assert len(hosts) > 0, "python driver connection needs at least one host to connect to"
@@ -152,8 +165,40 @@ def cluster_con(hosts: List[IPAddress | EndPoint], port: int, use_ssl: bool, aut
)
@pytest.fixture(scope="session")
async def manager_internal(event_loop, request):
@pytest.fixture(scope="module")
async def manager_api_sock_path(request: pytest.FixtureRequest, testpy_test: Test) -> AsyncGenerator[str]:
if manager_api := request.config.getoption("--manager-api"):
yield manager_api
else:
test_uname = testpy_test.uname
clusters = testpy_test.suite.clusters
base_dir = str(testpy_test.suite.log_dir)
sock_path = f"{tempfile.mkdtemp(prefix='manager-', dir='/tmp')}/api"
start_event = Event()
stop_event = Event()
async def run_manager() -> None:
mgr = ScyllaClusterManager(test_uname=test_uname, clusters=clusters, base_dir=base_dir, sock_path=sock_path)
await mgr.start()
start_event.set()
try:
await asyncio.get_running_loop().run_in_executor(None, stop_event.wait)
finally:
await mgr.stop()
manager_process = Process(target=lambda: asyncio.run(run_manager()))
manager_process.start()
start_event.wait()
yield sock_path
stop_event.set()
manager_process.join()
@pytest.fixture(scope="module")
async def manager_internal(request: pytest.FixtureRequest, manager_api_sock_path: str) -> Callable[[], ManagerClient]:
"""Session fixture to prepare client object for communicating with the Cluster API.
Pass the Unix socket path where the Manager server API is listening.
Pass a function to create driver connections.
@@ -167,23 +212,25 @@ async def manager_internal(event_loop, request):
auth_provider = PlainTextAuthProvider(username=auth_username, password=auth_password)
else:
auth_provider = None
manager_int = partial(ManagerClient, request.config.getoption('manager_api'), port, use_ssl, auth_provider, cluster_con)
yield manager_int
return lambda: ManagerClient(
sock_path=manager_api_sock_path,
port=port,
use_ssl=use_ssl,
auth_provider=auth_provider,
con_gen=cluster_con,
)
@pytest.fixture(scope="function")
async def manager(request, manager_internal, record_property, build_mode):
"""Per test fixture to notify Manager client object when tests begin so it can
perform checks for cluster state.
async def manager(request: pytest.FixtureRequest,
manager_internal: Callable[[], ManagerClient],
record_property: Callable[[str, object], None],
testpy_test: Test) -> AsyncGenerator[ManagerClient]:
"""
Per test fixture to notify Manager client object when tests begin so it can perform checks for cluster state.
"""
test_case_name = request.node.name
run_id = request.config.getoption('run_id')
tmp_dir = pathlib.Path(request.config.getoption('tmpdir'))
xml_path: pathlib.Path = pathlib.Path(request.config.getoption('xmlpath'))
suite_testpy_log = (tmp_dir /
build_mode /
f"{pathlib.Path(xml_path.stem).stem}.log"
)
suite_testpy_log = testpy_test.log_filename
test_log = suite_testpy_log.parent / f"{suite_testpy_log.stem}.{test_case_name}.log"
# this should be consistent with scylla_cluster.py handler name in _before_test method
test_py_log_test = suite_testpy_log.parent / f"{suite_testpy_log.stem}_{test_case_name}_cluster.log"
@@ -199,13 +246,13 @@ async def manager(request, manager_internal, record_property, build_mode):
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = tmp_dir / build_mode / "failed_test" / f"{test_case_name}"
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / f"stacktrace", 'w') as f:
with open(failed_test_dir_path / "stacktrace", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
@@ -217,7 +264,11 @@ async def manager(request, manager_internal, record_property, build_mode):
cluster_status = await manager_client.after_test(test_case_name, not failed)
await manager_client.stop() # Stop client session and close driver after each test
if cluster_status["server_broken"]:
pytest.fail(f"test case {test_case_name} leave unfinished tasks on Scylla server. Server marked as broken, server_broken_reason: {cluster_status["message"]}")
pytest.fail(
f"test case {test_case_name} leave unfinished tasks on Scylla server. Server marked as broken,"
f" server_broken_reason: {cluster_status["message"]}"
)
# "cql" fixture: set up client object for communicating with the CQL API.
# Since connection is managed by manager just return that object
@@ -254,7 +305,7 @@ skipped_funcs = {}
# The reason to skip a test should be specified, used as a comment only.
# Additionally, platform_key can be specified to limit the scope of the attribute
# to the specified platform. Example platform_key-s: [aarch64, x86_64]
def skip_mode(mode: str, reason: str, platform_key: Optional[str]=None):
def skip_mode(mode: str, reason: str, platform_key: str | None = None):
def wrap(func):
skipped_funcs.setdefault((func, mode), []).append((reason, platform_key))
return func

View File

@@ -0,0 +1,21 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import os
import random
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pytest import Session
def pytest_sessionstart(session: Session) -> None:
if not session.config.getoption("collectonly") and "xdist" in sys.modules:
if sys.modules["xdist"].is_xdist_controller(request_or_session=session):
os.environ["TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED"] = str(random.randrange(sys.maxsize))

View File

@@ -6,6 +6,7 @@
from __future__ import annotations
import os
import sys
import time
import random
@@ -35,7 +36,10 @@ TESTS_COUNT = 1 # number of tests from the whole matrix to run, None to run the
# Following parameters can be adjusted to run same sequence of tests from a previous run. Look at logs for the values.
# Also see `pytest_generate_tests()` below for details.
TESTS_SHUFFLE_SEED = random.randrange(sys.maxsize) # seed for the tests order randomization
# Seed for the tests order randomization.
TESTS_SHUFFLE_SEED = int(os.environ.get("TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED", random.randrange(sys.maxsize)))
ERROR_INJECTIONS_COUNT = len(ERROR_INJECTIONS) # change it to limit number of error injections
CLUSTER_EVENTS_COUNT = len(CLUSTER_EVENTS) # change it to limit number of cluster events

View File

@@ -6,37 +6,36 @@
from __future__ import annotations
import asyncio
import logging
import os
import subprocess
import sys
from argparse import BooleanOptionalAction
from pathlib import Path
from random import randint
from typing import TYPE_CHECKING
import pytest
from test import TEST_RUNNER
from test import ALL_MODES, TEST_RUNNER, TOP_SRC_DIR
from test.pylib.report_plugin import ReportPlugin
from test.pylib.suite.base import TestSuite, init_testsuite_globals
from test.pylib.util import get_configured_modes, prepare_dirs, start_3rd_party_services
from test.pylib.util import get_configured_modes
from test.pylib.suite.base import (
TestSuite,
find_suite_config,
init_testsuite_globals,
prepare_dirs,
start_3rd_party_services,
)
if TYPE_CHECKING:
from asyncio import AbstractEventLoop
from test.pylib.cpp.item import CppTestFunction
from test.pylib.suite.base import Test
ALL_MODES = {'debug': 'Debug',
'release': 'RelWithDebInfo',
'dev': 'Dev',
'sanitize': 'Sanitize',
'coverage': 'Coverage'}
def pytest_addoption(parser):
parser.addoption('--mode', choices=ALL_MODES.keys(), action="append", dest="modes",
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption('--mode', choices=ALL_MODES, action="append", dest="modes",
help="Run only tests for given build mode(s)")
parser.addoption('--tmpdir', action='store', default='testlog', help='''Path to temporary test data and log files. The data is
further segregated per build mode. Default: ./testlog.''', )
parser.addoption('--tmpdir', action='store', default=str(TOP_SRC_DIR / 'testlog'),
help='Path to temporary test data and log files. The data is further segregated per build mode.')
parser.addoption('--run_id', action='store', default=None, help='Run id for the test run')
parser.addoption('--byte-limit', action="store", default=randint(0, 2000), type=int,
help="Specific byte limit for failure injection (random by default)")
@@ -50,9 +49,25 @@ def pytest_addoption(parser):
parser.addoption('--test-py-init', action='store_true', default=False,
help='Run pytest session in test.py-compatible mode. I.e., start all required services, etc.')
# Options for compatibility with test.py
parser.addoption('--save-log-on-success', default=False,
dest="save_log_on_success", action="store_true",
help="Save test log output on success.")
parser.addoption('--coverage', action='store_true', default=False,
help="When running code instrumented with coverage support"
"Will route the profiles to `tmpdir`/mode/coverage/`suite` and post process them in order to generate "
"lcov file per suite, lcov file per mode, and an lcov file for the entire run, "
"The lcov files can eventually be used for generating coverage reports")
parser.addoption("--cluster-pool-size", type=int,
help="Set the pool_size for PythonTest and its descendants. Alternatively environment variable "
"CLUSTER_POOL_SIZE can be used to achieve the same")
parser.addoption("--extra-scylla-cmdline-options", default=[],
help="Passing extra scylla cmdline options for all tests. Options should be space separated:"
" '--logger-log-level raft=trace --default-log-level error'")
@pytest.fixture(scope="session")
def build_mode(request):
def build_mode(request: pytest.FixtureRequest) -> str:
"""
This fixture returns current build mode.
This is for running tests through the test.py script, where only one mode is passed to the test
@@ -64,10 +79,24 @@ def build_mode(request):
return mode[0]
return mode
def pytest_configure(config):
@pytest.fixture(scope="module")
async def testpy_testsuite(request: pytest.FixtureRequest, build_mode: str) -> TestSuite:
suite_config = find_suite_config(path=request.path)
return TestSuite.opt_create(path=str(suite_config.parent), options=request.config.option, mode=build_mode)
@pytest.fixture(scope="module")
async def testpy_test(request: pytest.FixtureRequest, testpy_testsuite: TestSuite) -> Test:
await testpy_testsuite.add_test(shortname=request.node.name, casename=None)
return testpy_testsuite.tests[-1] # most recent test added to the test suite; i.e., by the previous line.
def pytest_configure(config: pytest.Config) -> None:
config.pluginmanager.register(ReportPlugin())
def pytest_collection_modifyitems(config, items):
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item | CppTestFunction]) -> None:
"""
This is a standard pytest method.
This is needed to modify the test names with dev mode and run id to differ them one from another
@@ -113,7 +142,7 @@ def pytest_sessionstart(session: pytest.Session) -> None:
if "xdist" not in sys.modules or not sys.modules["xdist"].is_xdist_worker(request_or_session=session):
temp_dir = Path(session.config.getoption("--tmpdir")).absolute()
prepare_dirs(tempdir_base=temp_dir, modes=session.config.getoption("--mode") or get_configured_modes())
start_3rd_party_services(tempdir_base=temp_dir, toxyproxy_byte_limit=session.config.getoption('byte_limit'))
start_3rd_party_services(tempdir_base=temp_dir, toxyproxy_byte_limit=session.config.getoption("byte_limit"))
def pytest_sessionfinish() -> None:

View File

@@ -15,7 +15,7 @@ from typing import NamedTuple
import pytest
import requests.exceptions
from test import TOP_SRC_DIR, BUILD_DIR
from test import TOP_SRC_DIR, path_to
from test.nodetool.rest_api_mock import set_expected_requests, expected_request, get_expected_requests, \
get_unexpected_requests, expected_requests_manager
@@ -99,9 +99,9 @@ def jmx(request, rest_api_mock_server):
if jmx_path is None:
jmx_path = TOP_SRC_DIR / "tools" / "jmx" / "scripts" / "scylla-jmx"
else:
jmx_path = os.path.abspath(jmx_path)
jmx_path = Path(jmx_path).absolute()
workdir = os.path.join(os.path.dirname(jmx_path), "..")
workdir = jmx_path.parent.parent
ip, api_port = rest_api_mock_server
expected_requests = [
expected_request(
@@ -150,30 +150,16 @@ def jmx(request, rest_api_mock_server):
jmx_process.wait()
all_modes = {'debug': 'Debug',
'release': 'RelWithDebInfo',
'dev': 'Dev',
'sanitize': 'Sanitize',
'coverage': 'Coverage'}
def _path_to_scylla(mode) -> Path:
ninja = BUILD_DIR / 'build.ninja'
if ninja.exists():
return BUILD_DIR / all_modes[mode] / "scylla"
return BUILD_DIR / mode / "scylla"
@pytest.fixture(scope="session")
def nodetool_path(request, build_mode):
if request.config.getoption("nodetool") == "scylla":
return _path_to_scylla(build_mode)
return path_to(build_mode, "scylla")
path = request.config.getoption("nodetool_path")
if path is not None:
return os.path.abspath(path)
return TOP_SRC_DIR / "java" / "bin" / "nodetool"
return str(TOP_SRC_DIR / "java" / "bin" / "nodetool")
@pytest.fixture(scope="function")

View File

@@ -10,7 +10,7 @@ from asyncio loop.
Example usage:
from cassandra.cluster import Session, Cluster
from test.pylib.async_cql import event_loop, run_async
from test.pylib.async_cql import run_async
Session.run_async = run_async
ccluster = Cluster(...)
@@ -20,24 +20,13 @@ Example usage:
import asyncio
import logging
from typing import List
import pytest
from cassandra.cluster import ResponseFuture # type: ignore # pylint: disable=no-name-in-module
logger = logging.getLogger(__name__)
@pytest.fixture(scope="session")
def event_loop(request):
"""Change default pytest-asyncio event_loop fixture scope to session to
allow async fixtures with scope larger than function. (e.g. manager fixture)
See https://github.com/pytest-dev/pytest-asyncio/issues/68"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
def _wrap_future(driver_response_future: ResponseFuture, all_pages: bool = False) -> asyncio.Future:
"""Wrap a cassandra Future into an asyncio.Future object.

View File

@@ -12,22 +12,13 @@ from pathlib import Path, PosixPath
import yaml
from pytest import Collector
from test import ALL_MODES, DEBUG_MODES, TOP_SRC_DIR
from test.pylib.cpp.boost.boost_facade import COMBINED_TESTS
from test.pylib.cpp.facade import CppTestFacade
from test.pylib.cpp.item import CppFile
from test.pylib.util import get_modes_to_run
ALL_MODES = {
'debug': 'Debug',
'release': 'RelWithDebInfo',
'dev': 'Dev',
'sanitize': 'Sanitize',
'coverage': 'Coverage',
}
DEBUG_MODES = {
'debug': 'Debug',
'sanitize': 'Sanitize',
}
DEFAULT_ARGS = [
'--overprovisioned',
'--unsafe-bypass-fsync 1',
@@ -79,8 +70,6 @@ def read_suite_config(directory: Path) -> dict[str, str]:
raise RuntimeError('Failed to load tests: suite.yaml is empty')
return cfg
def get_root_path(session) -> Path:
return Path(session.config.rootpath).parent
def collect_items(file_path: PosixPath, parent: Collector, facade: CppTestFacade) -> object:
"""
@@ -110,9 +99,9 @@ def collect_items(file_path: PosixPath, parent: Collector, facade: CppTestFacade
@cache
def get_combined_tests(session):
def get_combined_tests():
suites = collections.defaultdict()
executable = get_root_path(session) / COMBINED_TESTS
executable = TOP_SRC_DIR / COMBINED_TESTS
args = [executable, '--list_content']
output = subprocess.check_output(

View File

@@ -41,7 +41,7 @@ class ManagerClient():
# pylint: disable=too-many-public-methods
def __init__(self, sock_path: str, port: int, use_ssl: bool, auth_provider: Any|None,
con_gen: Callable[[List[IPAddress], int, bool, Any], CassandraSession]) \
con_gen: Callable[[List[IPAddress], int, bool, Any], CassandraCluster]) \
-> None:
self.test_log_fh: Optional[logging.FileHandler] = None
self.port = port

View File

@@ -4,6 +4,8 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import asyncio
import getpass
import logging
@@ -11,17 +13,30 @@ import os
import platform
import subprocess
from abc import ABC
from asyncio import Task, Event
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import TextIO
from typing import TYPE_CHECKING
import psutil
from test.pylib.db.model import Metric, SystemResourceMetric, CgroupMetric, Test
from test.pylib.db.writer import DATE_TIME_TEMPLATE, SQLiteWriter, SYSTEM_RESOURCE_METRICS_TABLE, METRICS_TABLE, \
DEFAULT_DB_NAME, CGROUP_MEMORY_METRICS_TABLE, TESTS_TABLE
from test.pylib.db.writer import (
CGROUP_MEMORY_METRICS_TABLE,
DATE_TIME_TEMPLATE,
DEFAULT_DB_NAME,
METRICS_TABLE,
SYSTEM_RESOURCE_METRICS_TABLE,
TESTS_TABLE,
SQLiteWriter,
)
if TYPE_CHECKING:
from asyncio import Task, Event
from typing import TextIO
from test.pylib.suite.base import Test as TestPyTest
@lru_cache(maxsize=None)
def get_cgroup() -> Path:
@@ -33,25 +48,25 @@ def get_cgroup() -> Path:
# This can be used to manipulate the cgroup's controllers
return Path(f"/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}/initial")
CGROUP_INITIAL: Path = get_cgroup()
CGROUP_TESTS: Path = CGROUP_INITIAL.parent / 'tests'
cancel_event_global = None
stop_event_global = None
CGROUP_INITIAL = get_cgroup()
CGROUP_TESTS = CGROUP_INITIAL.parent / 'tests'
class ResourceGather(ABC):
def __init__(self, test, tmp_dir: str):
def __init__(self, test: TestPyTest):
self.test = test
self.db_path = Path(tmp_dir) / DEFAULT_DB_NAME
self.db_path = self.test.suite.log_dir / DEFAULT_DB_NAME
standardized_name = self.test.shortname.replace("/", "_")
self.cgroup_path = Path(
f"{CGROUP_TESTS}/{self.test.suite.name}.{standardized_name}.{self.test.suite.mode}.{self.test.id}"
)
self.logger = logging.getLogger(__name__)
def make_cgroup(self):
def make_cgroup(self) -> None:
pass
def put_process_to_cgroup(self):
def put_process_to_cgroup(self) -> None:
os.setsid()
def get_test_metrics(self) -> Metric:
@@ -60,21 +75,21 @@ class ResourceGather(ABC):
def write_metrics_to_db(self, metrics: Metric, success: bool = False) -> None:
pass
def cgroup_monitor(self, test_event: Event):
def cgroup_monitor(self, test_event: Event) -> Task:
pass
def remove_cgroup(self):
def remove_cgroup(self) -> None:
pass
class ResourceGatherOff(ResourceGather):
def cgroup_monitor(self, test_event) -> Task:
def cgroup_monitor(self, test_event: Event) -> Task:
return asyncio.create_task(no_monitor())
class ResourceGatherOn(ResourceGather):
def __init__(self, test, tmp_dir: str):
super().__init__(test, tmp_dir)
def __init__(self, test: TestPyTest):
super().__init__(test)
self.sqlite_writer = SQLiteWriter(self.db_path)
self.test_id: int = self.sqlite_writer.write_row_if_not_exist(
Test(
@@ -86,7 +101,7 @@ class ResourceGatherOn(ResourceGather):
),
TESTS_TABLE)
def make_cgroup(self):
def make_cgroup(self) -> None:
os.makedirs(self.cgroup_path, exist_ok=True)
def get_test_metrics(self) -> Metric:
@@ -107,14 +122,13 @@ class ResourceGatherOn(ResourceGather):
metrics.success = success
self.sqlite_writer.write_row(metrics, METRICS_TABLE)
def put_process_to_cgroup(self):
def put_process_to_cgroup(self) -> None:
super().put_process_to_cgroup()
pid = os.getpid()
with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup:
cgroup.write(str(pid))
def remove_cgroup(self):
def remove_cgroup(self) -> None:
os.rmdir(self.cgroup_path)
def cgroup_monitor(self, test_event: Event) -> Task:
@@ -148,11 +162,12 @@ class ResourceGatherOn(ResourceGather):
setattr(metrics, stats[stat], float(value) / 1_000_000)
def get_resource_gather(is_switched_on: bool, test, tmpdir: str) -> ResourceGather:
def get_resource_gather(is_switched_on: bool, test: TestPyTest) -> ResourceGather:
if is_switched_on:
return ResourceGatherOn(test, tmpdir)
return ResourceGatherOn(test)
else:
return ResourceGatherOff(test, tmpdir)
return ResourceGatherOff(test)
def _is_cgroup_rw() -> bool:
with open('/proc/mounts', 'r') as f:
@@ -211,7 +226,7 @@ def setup_cgroup(is_required: bool) -> None:
f.write(controllers)
async def monitor_resources(cancel_event: asyncio.Event, stop_event: asyncio.Event, tmpdir: Path) -> None:
async def monitor_resources(cancel_event: Event, stop_event: Event, tmpdir: Path) -> None:
"""Continuously monitors CPU and memory utilization."""
sqlite_writer = SQLiteWriter(tmpdir / DEFAULT_DB_NAME)
while not cancel_event.is_set() and not stop_event.is_set():
@@ -227,11 +242,11 @@ async def monitor_resources(cancel_event: asyncio.Event, stop_event: asyncio.Eve
await asyncio.sleep(2)
async def no_monitor():
async def no_monitor() -> None:
pass
def run_resource_watcher(is_required, cancel_event, stop_event, tmpdir: str) -> Task:
def run_resource_watcher(is_required: bool, cancel_event: Event, stop_event:Event, tmpdir: str) -> Task:
if is_required:
return asyncio.create_task(monitor_resources(cancel_event, stop_event, Path(tmpdir)))
return asyncio.create_task(no_monitor())

View File

@@ -24,10 +24,12 @@ from typing import Any, Optional, Dict, List, Set, Tuple, Callable, AsyncIterato
Awaitable
import uuid
from io import BufferedWriter
from test import TOP_SRC_DIR, TEST_DIR
from test.pylib.host_registry import Host, HostRegistry
from test.pylib.pool import Pool
from test.pylib.rest_client import ScyllaRESTAPIClient, HTTPError
from test.pylib.util import LogPrefixAdapter, read_last_line, gather_safely
from test.pylib.util import LogPrefixAdapter, read_last_line, gather_safely, get_xdist_worker_id
from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo, ServerUpState
from functools import partial
import aiohttp
@@ -264,7 +266,7 @@ class ScyllaServer:
host_id: HostID # Host id (UUID)
newid = itertools.count(start=1).__next__ # Sequential unique id
def __init__(self, mode: str, exe: str, vardir: str,
def __init__(self, mode: str, exe: str, vardir: str | pathlib.Path,
logger: Union[logging.Logger, logging.LoggerAdapter],
cluster_name: str, ip_addr: str, seeds: List[str],
cmdline_options: List[str],
@@ -274,8 +276,11 @@ class ScyllaServer:
server_encryption: str) -> None:
# pylint: disable=too-many-arguments
self.server_id = ServerNum(ScyllaServer.newid())
xdist_worker_id = get_xdist_worker_id()
# this variable needed to make a cleanup after server is not needed anymore
self.maintenance_socket_dir = tempfile.TemporaryDirectory(prefix=f"scylladb-{self.server_id}-test.py-")
self.maintenance_socket_dir = tempfile.TemporaryDirectory(
prefix=f"scylladb-{f'{xdist_worker_id}-' if xdist_worker_id else ''}{self.server_id}-test.py-"
)
self.maintenance_socket_path = f"{self.maintenance_socket_dir.name}/cql.m"
self.exe = pathlib.Path(exe).resolve()
self.vardir = pathlib.Path(vardir)
@@ -290,15 +295,15 @@ class ScyllaServer:
self.log_savepoint = 0
self.control_cluster: Optional[Cluster] = None
self.control_connection: Optional[Session] = None
shortname = f"scylla-{self.server_id}"
shortname = f"scylla-{f'{xdist_worker_id}-' if xdist_worker_id else ''}{self.server_id}"
self.workdir = self.vardir / shortname
self.log_filename = (self.vardir / shortname).with_suffix(".log")
self.log_filename = self.workdir.with_suffix(".log")
self.config_filename = self.workdir / "conf/scylla.yaml"
self.property_filename = self.workdir / "conf/cassandra-rackdc.properties"
self.certificate_filename = self.workdir / "conf/scylla.crt"
self.keyfile_filename = self.workdir / "conf/scylla.key"
self.truststore_filename = self.workdir / "conf/scyllacadb.pem"
self.resourcesdir = pathlib.Path.cwd() / "test/pylib/resources"
self.resourcesdir = TEST_DIR / "pylib/resources"
self.resources_certificate_file = self.resourcesdir / "scylla.crt"
self.resources_keyfile_file = self.resourcesdir / "scylla.key"
@@ -590,7 +595,7 @@ class ScyllaServer:
# remove from env to make sure user's SCYLLA_HOME has no impact
env.pop('SCYLLA_HOME', None)
env.update(self.append_env)
env['UBSAN_OPTIONS'] = f'halt_on_error=1:abort_on_error=1:suppressions={os.getcwd()}/ubsan-suppressions.supp'
env['UBSAN_OPTIONS'] = f'halt_on_error=1:abort_on_error=1:suppressions={TOP_SRC_DIR / "ubsan-suppressions.supp"}'
env['ASAN_OPTIONS'] = f'disable_coredump=0:abort_on_error=1:detect_stack_use_after_return=1'
self.cmd = await asyncio.create_subprocess_exec(
@@ -1263,6 +1268,7 @@ class ScyllaCluster:
server = self.servers[server_id]
return server.get_sstables_disk_usage(keyspace, table)
class ScyllaClusterManager:
"""Manages a Scylla cluster for running test cases
Provides an async API for tests to request changes in the Cluster.
@@ -1273,7 +1279,11 @@ class ScyllaClusterManager:
site: aiohttp.web.UnixSite
is_after_test_ok: bool
def __init__(self, test_uname: str, clusters: Pool[ScyllaCluster], base_dir: str) -> None:
def __init__(self,
test_uname: str,
clusters: Pool[ScyllaCluster],
base_dir: str,
sock_path: str | None = None) -> None:
self.test_uname: str = test_uname
self.base_dir: str = base_dir
logger = logging.getLogger(self.test_uname)
@@ -1291,8 +1301,12 @@ class ScyllaClusterManager:
# NOTE: need to make a safe temp dir as tempfile can't make a safe temp sock name
# Put the socket in /tmp, not base_dir, to avoid going over the length
# limit of UNIX-domain socket addresses (issue #12622).
self.manager_dir: str = tempfile.mkdtemp(prefix="manager-", dir="/tmp")
self.sock_path: str = f"{self.manager_dir}/api"
if sock_path is None:
self.manager_dir: str = tempfile.mkdtemp(prefix="manager-", dir="/tmp")
self.sock_path: str = f"{self.manager_dir}/api"
else:
self.manager_dir = os.path.dirname(sock_path)
self.sock_path = sock_path
app = aiohttp.web.Application()
self._setup_routes(app)
self.runner = aiohttp.web.AppRunner(app)

View File

@@ -14,34 +14,34 @@ import logging
import os
import pathlib
import re
import shlex
import shutil
import sys
import time
import traceback
import xml.etree.ElementTree as ET
from abc import ABC, abstractmethod
from importlib import import_module
from typing import TYPE_CHECKING
import colorama
import universalasync
import yaml
from test import ALL_MODES, DEBUG_MODES, TOP_SRC_DIR, TEST_DIR, TEST_RUNNER
from test.pylib.artifact_registry import ArtifactRegistry
from test.pylib.host_registry import HostRegistry
from test.pylib.ldap_server import start_ldap
from test.pylib.minio_server import MinioServer
from test.pylib.resource_gather import get_resource_gather
from test.pylib.s3_proxy import S3ProxyServer
from test.pylib.s3_server_mock import MockS3Server
from test.pylib.util import LogPrefixAdapter, get_xdist_worker_id
if TYPE_CHECKING:
from collections.abc import Callable, Iterable
from typing import Any, Dict, List
from typing import Any, List
all_modes = {'debug': 'Debug',
'release': 'RelWithDebInfo',
'dev': 'Dev',
'sanitize': 'Sanitize',
'coverage': 'Coverage'}
debug_modes = {'debug', 'sanitize'}
SUITE_CONFIG_FILENAME = "suite.yaml"
output_is_a_tty = sys.stdout.isatty()
@@ -75,28 +75,24 @@ class palette:
return palette.ansi_escape.sub('', text)
def path_to(mode, *components):
"""Resolve path to built executable"""
build_dir = 'build'
if os.path.exists(os.path.join(build_dir, 'build.ninja')):
*dir_components, basename = components
return os.path.join(build_dir, *dir_components, all_modes[mode], basename)
return os.path.join(build_dir, mode, *components)
class TestSuite(ABC):
"""A test suite is a folder with tests of the same type.
E.g. it can be unit tests, boost tests, or CQL tests."""
# All existing test suites, one suite per path/mode.
suites: Dict[str, 'TestSuite'] = dict()
suites: dict[str, TestSuite] = {}
artifacts: ArtifactRegistry
hosts: HostRegistry
FLAKY_RETRIES = 5
_next_id = collections.defaultdict(int) # (test_key -> id)
def __init__(self, path: str, cfg: dict, options: argparse.Namespace, mode: str) -> None:
self.suite_path = pathlib.Path(path)
self.log_dir = pathlib.Path(options.tmpdir) / mode
self.name = str(self.suite_path.name)
self.cfg = cfg
self.options = options
@@ -116,7 +112,7 @@ class TestSuite(ABC):
self.flaky_tests = set(self.cfg.get("flaky", []))
# If this mode is one of the debug modes, and there are
# tests disabled in a debug mode, add these tests to the skip list.
if mode in debug_modes:
if mode in DEBUG_MODES:
self.disabled_tests.update(self.cfg.get("skip_in_debug_modes", []))
# If a test is listed in run_in_<mode>, it should only be enabled in
# this mode. Tests not listed in any run_in_<mode> directive should
@@ -125,7 +121,7 @@ class TestSuite(ABC):
# This of course may create ambiguity with skip_* settings,
# since the priority of the two is undefined, but oh well.
run_in_m = set(self.cfg.get("run_in_" + mode, []))
for a in all_modes:
for a in ALL_MODES:
if a == mode:
continue
skip_in_m = set(self.cfg.get("run_in_" + a, []))
@@ -138,7 +134,9 @@ class TestSuite(ABC):
# this way is that the storage will not be bloated with coverage files (each can weigh 10s of MBs so for several
# thousands of tests it can easily reach 10 of GBs)
# ref: https://clang.llvm.org/docs/SourceBasedCodeCoverage.html#running-the-instrumented-program
self.base_env["LLVM_PROFILE_FILE"] = os.path.join(options.tmpdir,self.mode, "coverage", self.name, "%m.profraw")
self.base_env["LLVM_PROFILE_FILE"] = str(self.log_dir / "coverage" / self.name / "%m.profraw")
# Generate a unique ID for `--repeat`ed tests
# We want these tests to have different XML IDs so test result
# processors (Jenkins) don't merge results for different iterations of
@@ -202,7 +200,7 @@ class TestSuite(ABC):
pass
@abstractmethod
async def add_test(self, shortname: str, casename: str) -> None:
async def add_test(self, shortname: str, casename: str | None) -> None:
pass
async def run(self, test: 'Test', options: argparse.Namespace):
@@ -308,10 +306,12 @@ class Test:
self.shortname = shortname
self.mode = suite.mode
self.suite = suite
self.allure_dir = pathlib.Path(suite.options.tmpdir) / self.mode / 'allure'
self.allure_dir = self.suite.log_dir / 'allure'
# Unique file name, which is also readable by human, as filename prefix
self.uname = "{}.{}.{}".format(self.suite.name, self.shortname.replace("/","."), self.id)
self.log_filename = pathlib.Path(suite.options.tmpdir) / self.mode / (self.uname + ".log")
self.uname = f"{self.suite.name}.{self.shortname.replace('/', '_')}.{self.id}"
if xdist_worker_id := get_xdist_worker_id():
self.uname = f"{xdist_worker_id}.{self.uname}"
self.log_filename = self.suite.log_dir / f"{self.uname}.log"
self.log_filename.parent.mkdir(parents=True, exist_ok=True)
self.is_flaky = self.shortname in suite.flaky_tests
# True if the test was retried after it failed
@@ -403,7 +403,6 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
"""Run test program, return True if success else False"""
with test.log_filename.open("wb") as log:
cleanup_fn = None
def report_error(error, failure_injection_desc = None):
msg = "=== TEST.PY SUMMARY START ===\n"
msg += "{}\n".format(error)
@@ -413,12 +412,12 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
log.write(msg.encode(encoding="UTF-8"))
process = None
stdout = None
logging.info("Starting test %s: %s %s", test.uname, test.path, " ".join(test.args))
UBSAN_OPTIONS = [
"halt_on_error=1",
"abort_on_error=1",
f"suppressions={os.getcwd()}/ubsan-suppressions.supp",
f"suppressions={TOP_SRC_DIR / 'ubsan-suppressions.supp'}",
os.getenv("UBSAN_OPTIONS"),
]
ASAN_OPTIONS = [
@@ -428,7 +427,7 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
os.getenv("ASAN_OPTIONS"),
]
try:
resource_gather = get_resource_gather(options.gather_metrics, test, options.tmpdir)
resource_gather = get_resource_gather(is_switched_on=options.gather_metrics, test=test)
resource_gather.make_cgroup()
log.write("=== TEST.PY STARTING TEST {} ===\n".format(test.uname).encode(encoding="UTF-8"))
log.write("export UBSAN_OPTIONS='{}'\n".format(
@@ -450,16 +449,18 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
test_running_event = asyncio.Event()
test_resource_watcher = resource_gather.cgroup_monitor(test_event=test_running_event)
test_env = dict(os.environ,
UBSAN_OPTIONS=":".join(filter(None, UBSAN_OPTIONS)),
ASAN_OPTIONS=":".join(filter(None, ASAN_OPTIONS)),
# TMPDIR env variable is used by any seastar/scylla
# test for directory to store test temporary data.
TMPDIR=os.path.join(options.tmpdir, test.mode),
SCYLLA_TEST_ENV='yes',
SCYLLA_TEST_RUNNER="test.py",
**env,
)
test_env = dict(
os.environ,
UBSAN_OPTIONS=":".join(filter(None, UBSAN_OPTIONS)),
ASAN_OPTIONS=":".join(filter(None, ASAN_OPTIONS)),
# TMPDIR env variable is used by any seastar/scylla test for directory to store test temporary data.
TMPDIR=str(test.suite.log_dir),
SCYLLA_TEST_ENV="yes",
SCYLLA_TEST_RUNNER="test.py",
**env,
)
process = await asyncio.create_subprocess_exec(
path, *args,
stderr=log,
@@ -508,7 +509,81 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
report_error("Test was cancelled: the parent process is exiting")
except Exception as e:
report_error("Failed to run the test:\n{e}".format(e=e))
finally:
if cleanup_fn is not None:
cleanup_fn()
return False
def prepare_dir(dirname: pathlib.Path, pattern: str) -> None:
# Ensure the dir exists.
dirname.mkdir(parents=True, exist_ok=True)
# Remove old artifacts.
for p in dirname.rglob(pattern):
p.unlink()
def prepare_dirs(tempdir_base: pathlib.Path, modes: list[str]) -> None:
prepare_dir(tempdir_base, "*.log")
shutil.rmtree(tempdir_base / "ldap_instances", ignore_errors=True)
prepare_dir(tempdir_base / "ldap_instances", "*")
for mode in modes:
prepare_dir(tempdir_base / mode, "*.log")
prepare_dir(tempdir_base / mode, "*.reject")
prepare_dir(tempdir_base / mode / "xml", "*.xml")
shutil.rmtree(tempdir_base / mode / "failed_test", ignore_errors=True)
prepare_dir(tempdir_base / mode / "failed_test", "*")
prepare_dir(tempdir_base / mode / "allure", "*.xml")
if TEST_RUNNER != "pytest":
shutil.rmtree(tempdir_base / mode / "pytest", ignore_errors=True)
prepare_dir(tempdir_base / mode / "pytest", "*")
@universalasync.async_to_sync_wraps
async def start_3rd_party_services(tempdir_base: pathlib.Path, toxyproxy_byte_limit: int):
hosts = HostRegistry()
finalize = start_ldap(
host=await hosts.lease_host(),
port=5000,
instance_root=tempdir_base / 'ldap_instances',
toxyproxy_byte_limit=toxyproxy_byte_limit)
async def make_async_finalize():
finalize()
TestSuite.artifacts.add_exit_artifact(None, make_async_finalize)
ms = MinioServer(
tempdir_base=str(tempdir_base),
address="127.0.0.1",
logger=LogPrefixAdapter(logger=logging.getLogger("minio"), extra={"prefix": "minio"}),
)
await ms.start()
TestSuite.artifacts.add_exit_artifact(None, ms.stop)
TestSuite.artifacts.add_exit_artifact(None, hosts.cleanup)
mock_s3_server = MockS3Server(
host=await hosts.lease_host(),
port=2012,
logger=LogPrefixAdapter(logger=logging.getLogger("s3_mock"), extra={"prefix": "s3_mock"}),
)
await mock_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
minio_uri = f"http://{os.environ[ms.ENV_ADDRESS]}:{os.environ[ms.ENV_PORT]}"
proxy_s3_server = S3ProxyServer(
host=await hosts.lease_host(),
port=9002,
minio_uri=minio_uri,
max_retries=3,
seed=int(time.time()),
logger=LogPrefixAdapter(logger=logging.getLogger("s3_proxy"), extra={"prefix": "s3_proxy"}),
)
await proxy_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, proxy_s3_server.stop)
def find_suite_config(path: pathlib.Path) -> pathlib.Path:
for directory in (path.joinpath("_") if path.is_dir() else path).absolute().relative_to(TEST_DIR).parents:
suite_config = TEST_DIR / directory / SUITE_CONFIG_FILENAME
if suite_config.exists():
return suite_config
raise FileNotFoundError(f"Unable to find a suite config file ({SUITE_CONFIG_FILENAME}) related to {path}")

View File

@@ -16,8 +16,9 @@ import xml.etree.ElementTree as ET
from typing import TYPE_CHECKING
from scripts import coverage
from test import path_to
from test.pylib.scylla_cluster import merge_cmdline_options
from test.pylib.suite.base import Test, palette, path_to, read_log, run_test
from test.pylib.suite.base import Test, palette, read_log, run_test
from test.pylib.suite.unit import UnitTest, UnitTestSuite
if TYPE_CHECKING:
@@ -181,9 +182,9 @@ class BoostTest(Test):
if self.mode == "coverage":
self.env.update(coverage.env(self.path))
self.xmlout = os.path.join(suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml")
self.xmlout = self.suite.log_dir / "xml" / f"{self.uname}.xunit.xml"
boost_args += ['--report_level=no',
'--logger=HRF,test_suite:XML,test_suite,' + self.xmlout]
f'--logger=HRF,test_suite:XML,test_suite,{self.xmlout}']
boost_args += ['--catch_system_errors=no'] # causes undebuggable cores
boost_args += ['--color_output=false']
boost_args += ['--']

View File

@@ -48,10 +48,10 @@ class CQLApprovalTest(Test):
super().__init__(test_no, shortname, suite)
# Path to cql_repl driver, in the given build mode
self.path = "pytest"
self.cql = suite.suite_path / (self.shortname + ".cql")
self.result = suite.suite_path / (self.shortname + ".result")
self.tmpfile = os.path.join(suite.options.tmpdir, self.mode, self.uname + ".reject")
self.reject = suite.suite_path / (self.shortname + ".reject")
self.cql = self.suite.suite_path / f"{self.shortname}.cql"
self.result = self.suite.suite_path / f"{self.shortname}.result"
self.tmpfile = self.suite.log_dir / f"{self.uname}.reject"
self.reject = self.suite.suite_path / f"{self.shortname}.reject"
self.server_log: Optional[str] = None
self.server_log_filename: Optional[pathlib.Path] = None
self.is_before_test_ok = False
@@ -64,7 +64,7 @@ class CQLApprovalTest(Test):
self.server_log = None
self.server_log_filename = None
self.env: Dict[str, str] = dict()
self._prepare_args(suite.options)
self._prepare_args(self.suite.options)
def reset(self) -> None:
"""Reset the test before a retry, if it is retried as flaky"""

View File

@@ -14,9 +14,10 @@ import xml.etree.ElementTree as ET
from typing import TYPE_CHECKING
from scripts import coverage
from test import path_to
from test.pylib.pool import Pool
from test.pylib.scylla_cluster import ScyllaCluster, ScyllaServer, merge_cmdline_options
from test.pylib.suite.base import Test, TestSuite, path_to, read_log, run_test
from test.pylib.suite.base import Test, TestSuite, read_log, run_test
from test.pylib.util import LogPrefixAdapter
if TYPE_CHECKING:
@@ -85,7 +86,7 @@ class PythonTestSuite(TestSuite):
server = ScyllaServer(
mode=self.mode,
exe=self.scylla_exe,
vardir=os.path.join(self.options.tmpdir, self.mode),
vardir=self.log_dir,
logger=create_cfg.logger,
cluster_name=create_cfg.cluster_name,
ip_addr=create_cfg.ip_addr,
@@ -144,7 +145,7 @@ class PythonTest(Test):
self.path = "python"
self.core_args = ["-m", "pytest"]
self.casename = casename
self.xmlout = os.path.join(self.suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml")
self.xmlout = self.suite.log_dir / "xml" / f"{self.uname}.xunit.xml"
self.server_log: Optional[str] = None
self.server_log_filename: Optional[pathlib.Path] = None
self.is_before_test_ok = False

View File

@@ -7,11 +7,11 @@
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
from scripts import coverage
from test.pylib.suite.base import Test, TestSuite, path_to, read_log, run_test
from test import path_to
from test.pylib.suite.base import Test, TestSuite, read_log, run_test
if TYPE_CHECKING:
import argparse
@@ -44,7 +44,7 @@ class RunTest(Test):
def __init__(self, test_no: int, shortname: str, suite) -> None:
super().__init__(test_no, shortname, suite)
self.path = suite.suite_path / shortname
self.xmlout = os.path.join(suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml")
self.xmlout = self.suite.log_dir / "xml" / f"{self.uname}.xunit.xml"
self.args = [
"--junit-xml={}".format(self.xmlout),
"-vv",

View File

@@ -7,11 +7,10 @@
from __future__ import annotations
import logging
import os
import xml.etree.ElementTree as ET
from typing import TYPE_CHECKING
from test.pylib.suite.base import Test, TestSuite,run_test
from test.pylib.suite.base import Test, TestSuite, run_test
from test.pylib.util import LogPrefixAdapter
if TYPE_CHECKING:
@@ -46,7 +45,7 @@ class ToolTest(Test):
super().__init__(test_no, shortname, suite)
launcher = self.suite.cfg.get("launcher", "pytest")
self.path = launcher.split(maxsplit=1)[0]
self.xmlout = os.path.join(self.suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml")
self.xmlout = self.suite.log_dir / "xml" / f"{self.uname}.xunit.xml"
def _prepare_pytest_params(self, options: argparse.Namespace):
launcher = self.suite.cfg.get("launcher", "pytest")

View File

@@ -6,7 +6,6 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING
from test.pylib.scylla_cluster import get_cluster_manager
@@ -46,8 +45,7 @@ class TopologyTest(PythonTest):
self._prepare_pytest_params(options)
test_path = os.path.join(self.suite.options.tmpdir, self.mode)
async with get_cluster_manager(self.uname, self.suite.clusters, test_path) as manager:
async with get_cluster_manager(self.uname, self.suite.clusters, str(self.suite.log_dir)) as manager:
self.args.insert(0, "--tmpdir={}".format(options.tmpdir))
self.args.insert(0, "--manager-api={}".format(manager.sock_path))
if options.artifacts_dir_url:

View File

@@ -12,8 +12,9 @@ import shlex
from typing import TYPE_CHECKING
from scripts import coverage
from test import path_to
from test.pylib.scylla_cluster import merge_cmdline_options
from test.pylib.suite.base import Test, TestSuite, palette, path_to, read_log, run_test
from test.pylib.suite.base import Test, TestSuite, palette, read_log, run_test
if TYPE_CHECKING:
import argparse

View File

@@ -5,9 +5,7 @@
#
from __future__ import annotations
import glob
import re
import shutil
import subprocess
import threading
import time
@@ -23,20 +21,13 @@ import string
from typing import Optional, TypeVar, Any
import universalasync
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from cassandra import DriverException, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from test import TEST_RUNNER
from test.pylib.ldap_server import start_ldap
from test.pylib.host_registry import HostRegistry
from test import BUILD_DIR, TOP_SRC_DIR
from test.pylib.internal_types import ServerInfo
from test.pylib.minio_server import MinioServer
from test.pylib.s3_proxy import S3ProxyServer
from test.pylib.s3_server_mock import MockS3Server
from test.pylib.suite.base import TestSuite
logger = logging.getLogger(__name__)
@@ -271,19 +262,18 @@ async def wait_for_first_completed(coros: list[Coroutine]):
await t
def ninja(target):
"""Build specified target using ninja"""
build_dir = 'build'
args = ['ninja', target]
if os.path.exists(os.path.join(build_dir, 'build.ninja')):
args = ['ninja', '-C', build_dir, target]
return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0].decode()
def ninja(target: str) -> str:
"""Build specified target using ninja."""
return subprocess.Popen(
args=["ninja", *(["-C", str(BUILD_DIR)] if BUILD_DIR.joinpath("build.ninja").exists() else []), target],
stdout=subprocess.PIPE,
cwd=TOP_SRC_DIR,
).communicate()[0].decode()
@cache
def get_configured_modes(root_dir=None):
if root_dir:
os.chdir(root_dir)
def get_configured_modes() -> list[str]:
out = ninja('mode_list')
# [1/1] List configured modes
# debug release dev
@@ -294,7 +284,7 @@ def get_configured_modes(root_dir=None):
def get_modes_to_run(session) -> list[str]:
modes = session.config.getoption('modes')
if not modes:
modes = get_configured_modes(root_dir=pathlib.Path(session.config.rootpath).parent)
modes = get_configured_modes()
if not modes:
raise RuntimeError('No modes configured. Please run ./configure.py first')
return modes
@@ -317,69 +307,5 @@ async def gather_safely(*awaitables: Awaitable):
return results
def prepare_dir(dirname: str, pattern: str) -> None:
# Ensure the dir exists
pathlib.Path(dirname).mkdir(parents=True, exist_ok=True)
# Remove old artifacts
for p in glob.glob(os.path.join(dirname, pattern), recursive=True):
pathlib.Path(p).unlink()
def prepare_dirs(tempdir_base: str, modes: list[str]) -> None:
prepare_dir(tempdir_base, "*.log")
shutil.rmtree(os.path.join(tempdir_base, "ldap_instances"), ignore_errors=True)
prepare_dir(os.path.join(tempdir_base, "ldap_instances"), "*")
for mode in modes:
prepare_dir(os.path.join(tempdir_base, mode), "*.log")
prepare_dir(os.path.join(tempdir_base, mode), "*.reject")
prepare_dir(os.path.join(tempdir_base, mode, "xml"), "*.xml")
shutil.rmtree(os.path.join(tempdir_base, mode, "failed_test"), ignore_errors=True)
prepare_dir(os.path.join(tempdir_base, mode, "failed_test"), "*")
prepare_dir(os.path.join(tempdir_base, mode, "allure"), "*.xml")
if TEST_RUNNER != "pytest":
shutil.rmtree(os.path.join(tempdir_base, mode, "pytest"), ignore_errors=True)
prepare_dir(os.path.join(tempdir_base, mode, "pytest"), "*")
@universalasync.async_to_sync_wraps
async def start_3rd_party_services(tempdir_base: pathlib.Path, toxyproxy_byte_limit: int):
hosts = HostRegistry()
finalize = start_ldap(
host=await hosts.lease_host(),
port=5000,
instance_root=tempdir_base / 'ldap_instances',
toxyproxy_byte_limit=toxyproxy_byte_limit)
async def make_async_finalize():
finalize()
TestSuite.artifacts.add_exit_artifact(None, make_async_finalize)
ms = MinioServer(
tempdir_base=str(tempdir_base),
address="127.0.0.1",
logger=LogPrefixAdapter(logger=logging.getLogger("minio"), extra={"prefix": "minio"}),
)
await ms.start()
TestSuite.artifacts.add_exit_artifact(None, ms.stop)
TestSuite.artifacts.add_exit_artifact(None, hosts.cleanup)
mock_s3_server = MockS3Server(
host=await hosts.lease_host(),
port=2012,
logger=LogPrefixAdapter(logger=logging.getLogger("s3_mock"), extra={"prefix": "s3_mock"}),
)
await mock_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
minio_uri = f"http://{os.environ[ms.ENV_ADDRESS]}:{os.environ[ms.ENV_PORT]}"
proxy_s3_server = S3ProxyServer(
host=await hosts.lease_host(),
port=9002,
minio_uri=minio_uri,
max_retries=3,
seed=int(time.time()),
logger=LogPrefixAdapter(logger=logging.getLogger("s3_proxy"), extra={"prefix": "s3_proxy"}),
)
await proxy_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, proxy_s3_server.stop)
def get_xdist_worker_id() -> str | None:
return os.environ.get("PYTEST_XDIST_WORKER")

View File

@@ -1,5 +1,6 @@
[pytest]
asyncio_mode = auto
asyncio_default_fixture_loop_scope = module
log_format = %(asctime)s.%(msecs)03d %(levelname)s> %(message)s
log_date_format = %H:%M:%S