# # Copyright (C) 2022-present ScyllaDB # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 # # This file configures pytest for all tests in this directory, and also # defines common test fixtures for all of them to use from __future__ import annotations import asyncio import ssl import tempfile import platform import urllib.parse import warnings from concurrent.futures.thread import ThreadPoolExecutor from multiprocessing import Event from pathlib import Path from typing import TYPE_CHECKING from test import TOP_SRC_DIR, path_to from test.pylib.runner import testpy_test_fixture_scope from test.pylib.random_tables import RandomTables from test.pylib.util import unique_name from test.pylib.manager_client import ManagerClient from test.pylib.async_cql import run_async from test.pylib.scylla_cluster import ScyllaClusterManager, ScyllaVersionDescription, get_scylla_2025_1_description from test.pylib.suite.base import get_testpy_test from test.pylib.suite.python import add_cql_connection_options from test.pylib.encryption_provider import KeyProvider, make_key_provider_factory import logging import pytest from cassandra.auth import PlainTextAuthProvider # type: ignore # pylint: disable=no-name-in-module from cassandra.cluster import Session # type: ignore # pylint: disable=no-name-in-module from cassandra.cluster import Cluster, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore # pylint: disable=no-name-in-module from cassandra.policies import ExponentialReconnectionPolicy # type: ignore from cassandra.policies import RoundRobinPolicy # type: ignore from cassandra.policies import TokenAwarePolicy # type: ignore from cassandra.policies import WhiteListRoundRobinPolicy # type: ignore from cassandra.connection import DRIVER_NAME # type: ignore # pylint: disable=no-name-in-module from cassandra.connection import DRIVER_VERSION # type: ignore # pylint: disable=no-name-in-module from collections.abc import AsyncIterator if TYPE_CHECKING: from collections.abc import AsyncGenerator from typing import Callable from cassandra.connection import EndPoint from test.pylib.internal_types import IPAddress from test.pylib.suite.base import Test Session.run_async = run_async # patch Session for convenience logger = logging.getLogger(__name__) print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}") async def decode_backtrace(build_mode: str, input: str): executable = Path(path_to(build_mode, "scylla")) proc = await asyncio.create_subprocess_exec( (TOP_SRC_DIR / "seastar" / "scripts" / "seastar-addr2line").absolute(), "-e", executable.absolute(), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate(input=input.encode()) return f"{stdout.decode()}\n{stderr.decode()}" def pytest_addoption(parser): parser.addoption('--manager-api', action='store', help='Manager unix socket path') add_cql_connection_options(parser) parser.addoption('--skip-internet-dependent-tests', action='store_true', default=False, help='Skip tests which depend on artifacts from the internet') parser.addoption('--artifacts_dir_url', action='store', type=str, default=None, dest='artifacts_dir_url', help='Provide the URL to artifacts directory to generate the link to failed tests directory ' 'with logs') # This is a constant used in `pytest_runtest_makereport` below to store the full report for the test case # in a stash which can then be accessed from fixtures to print the stacktrace for the failed test PHASE_REPORT_KEY = pytest.StashKey[dict[str, pytest.CollectReport]]() @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): """This is a post-test hook executed by the pytest library. Use it to access the test result and store a flag indicating failure so we can later retrieve it in our fixtures like `manager`. `item.stash` is the same stash as `request.node.stash` (in the `request` fixture provided by pytest). """ outcome = yield report = outcome.get_result() item.stash[PHASE_REPORT_KEY] = report conn_logger = logging.getLogger("conn_messages") conn_logger.setLevel(logging.INFO) class CustomConnection(Cluster.connection_class): def send_msg(self, *args, **argv): conn_logger.debug(f"send_msg: ({id(self)}): {args} {argv}") return super(CustomConnection, self).send_msg(*args, **argv) def process_msg(self, msg, protocol_version): conn_logger.debug(f"process_msg: ({id(self)}): {msg}") return super(CustomConnection, self).process_msg(msg, protocol_version) # cluster_con helper: set up client object for communicating with the CQL API. def cluster_con(hosts: list[IPAddress | EndPoint], port: int = 9042, use_ssl: bool = False, auth_provider=None, load_balancing_policy=RoundRobinPolicy()): """Create a CQL Cluster connection object according to configuration. It does not .connect() yet.""" assert len(hosts) > 0, "python driver connection needs at least one host to connect to" profile = ExecutionProfile( load_balancing_policy=load_balancing_policy, consistency_level=ConsistencyLevel.LOCAL_QUORUM, serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL, # The default timeouts should have been more than enough, but in some # extreme cases with a very slow debug build running on a slow or very busy # machine, they may not be. Observed tests reach 160 seconds. So it's # incremented to 200 seconds. # See issue #11289. # NOTE: request_timeout is the main cause of timeouts, even if logs say heartbeat request_timeout=200) whitelist_profile = ExecutionProfile( load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)), consistency_level=ConsistencyLevel.LOCAL_QUORUM, serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL, request_timeout=200) if use_ssl: # Scylla does not support any earlier TLS protocol. If you try, # you will get mysterious EOF errors (see issue #6971) :-( ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) else: ssl_context = None return Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile, 'whitelist': whitelist_profile}, contact_points=hosts, port=port, # TODO: make the protocol version an option, to allow testing with # different versions. If we drop this setting completely, it will # mean pick the latest version supported by the client and the server. protocol_version=4, # NOTE: No auth provider as auth keysppace has RF=1 and topology will take # down nodes, causing errors. If auth is needed in the future for topology # tests, they should bump up auth RF and run repair. ssl_context=ssl_context, # The default timeouts should have been more than enough, but in some # extreme cases with a very slow debug build running on a slow or very busy # machine, they may not be. Observed tests reach 160 seconds. So it's # incremented to 200 seconds. # See issue #11289. connect_timeout = 200, control_connection_timeout = 200, # NOTE: max_schema_agreement_wait must be 2x or 3x smaller than request_timeout # else the driver can't handle a server being down max_schema_agreement_wait=20, idle_heartbeat_timeout=200, # The default reconnection policy has a large maximum interval # between retries (600 seconds). In tests that restart/replace nodes, # where a node can be unavailable for an extended period of time, # this can cause the reconnection retry interval to get very large, # longer than a test timeout. reconnection_policy = ExponentialReconnectionPolicy(1.0, 4.0), auth_provider=auth_provider, # Capture messages for debugging purposes. connection_class=CustomConnection ) @pytest.fixture(scope=testpy_test_fixture_scope) async def manager_api_sock_path(request: pytest.FixtureRequest, testpy_test: Test | None) -> AsyncGenerator[str]: if testpy_test is None: yield request.config.getoption("--manager-api") else: test_uname = testpy_test.uname clusters = testpy_test.suite.clusters base_dir = str(testpy_test.suite.log_dir) sock_path = f"{tempfile.mkdtemp(prefix='manager-', dir='/tmp')}/api" start_event = Event() stop_event = Event() async def run_manager() -> None: mgr = ScyllaClusterManager(test_uname=test_uname, clusters=clusters, base_dir=base_dir, sock_path=sock_path) await mgr.start() start_event.set() try: await asyncio.get_running_loop().run_in_executor(None, stop_event.wait) finally: await mgr.stop() with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(asyncio.run, run_manager()) start_event.wait() yield sock_path stop_event.set() future.result() @pytest.fixture(scope=testpy_test_fixture_scope) async def manager_internal(request: pytest.FixtureRequest, manager_api_sock_path: str) -> Callable[[], ManagerClient]: """Session fixture to prepare client object for communicating with the Cluster API. Pass the Unix socket path where the Manager server API is listening. Pass a function to create driver connections. Test cases (functions) should not use this fixture. """ port = int(request.config.getoption('port')) use_ssl = bool(request.config.getoption('ssl')) auth_username = request.config.getoption('auth_username', default=None) auth_password = request.config.getoption('auth_password', default=None) if auth_username is not None and auth_password is not None: auth_provider = PlainTextAuthProvider(username=auth_username, password=auth_password) else: auth_provider = None return lambda: ManagerClient( sock_path=manager_api_sock_path, port=port, use_ssl=use_ssl, auth_provider=auth_provider, con_gen=cluster_con, ) @pytest.fixture(scope="function") async def manager(request: pytest.FixtureRequest, manager_internal: Callable[[], ManagerClient], record_property: Callable[[str, object], None], build_mode: str) -> AsyncGenerator[ManagerClient]: """ Per test fixture to notify Manager client object when tests begin so it can perform checks for cluster state. """ testpy_test = await get_testpy_test(path=request.path, options=request.config.option, mode=build_mode) test_case_name = request.node.name suite_testpy_log = testpy_test.log_filename test_log = suite_testpy_log.parent / f"{Path(suite_testpy_log.stem).stem}.{test_case_name}.log" # this should be consistent with scylla_cluster.py handler name in _before_test method test_py_log_test = suite_testpy_log.parent / f"{test_log.stem}_cluster.log" manager_client = manager_internal() # set up client object in fixture with scope function await manager_client.before_test(test_case_name, test_log) yield manager_client # `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve # test failure. report = request.node.stash[PHASE_REPORT_KEY] failed = report.when == "call" and report.failed # Check if the test has the check_nodes_for_errors marker found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None)) failed_test_dir_path = None if failed or found_errors: # Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have # all related logs in one dir. # Then add property to the XML report with the path to the directory, so it can be visible in Jenkins failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate( str.maketrans('[]', '()')) failed_test_dir_path.mkdir(parents=True, exist_ok=True) if failed: await manager_client.gather_related_logs( failed_test_dir_path, {'pytest.log': test_log, 'test_py.log': test_py_log_test} ) with open(failed_test_dir_path / "stacktrace.txt", "w") as f: f.write(report.longreprtext) if request.config.getoption('artifacts_dir_url') is not None: # get the relative path to the tmpdir for the failed directory dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}" full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/', urllib.parse.quote(dir_path_relative)) record_property("TEST_LOGS", full_url) cluster_status = await manager_client.after_test(test_case_name, not failed) await manager_client.stop() # Stop client session and close driver after each test if cluster_status["server_broken"] and not failed: failed = True pytest.fail( f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken," f" server_broken_reason: {cluster_status["message"]}" ) if found_errors: full_message = [] for server, data in found_errors.items(): summary = [] detailed = [] if criticals := data.get("critical", []): summary.append(f"{len(criticals)} critical error(s)") detailed.extend(map(str.rstrip, criticals)) if backtraces := data.get("backtraces", []): summary.append(f"{len(backtraces)} backtrace(s)") with open(failed_test_dir_path / f"scylla-{server.server_id}-backtraces.txt", "w") as bt_file: for backtrace in backtraces: bt_file.write(backtrace + "\n\n") decoded_bt = await decode_backtrace(build_mode, backtrace) bt_file.write(decoded_bt + "\n\n") detailed.append(f"{len(backtraces)} backtrace(s) saved in {Path(bt_file.name).name}") if errors := data.get("error", []): summary.append(f"{len(errors)} error(s)") detailed.extend(map(str.rstrip, errors)) if cores := data.get("cores", []): summary.append(f"{len(cores)} core(s): {', '.join(cores)}") if summary: summary_line = f"Server {server.server_id}: found {', '.join(summary)} (log: { data['log']})" detailed = [f" {line}" for line in detailed] full_message.append(summary_line) full_message.extend(detailed) with open(failed_test_dir_path / "found_errors.txt", "w") as f: f.write("\n".join(full_message)) if not failed: pytest.fail(f"\n{'\n'.join(full_message)}") # "cql" fixture: set up client object for communicating with the CQL API. # Since connection is managed by manager just return that object @pytest.fixture(scope="function") def cql(manager): yield manager.cql # "random_tables" fixture: Creates and returns a temporary RandomTables object # used in tests to make schema changes. Tables are dropped after test finishes # unless the cluster is dirty or the test has failed. @pytest.fixture(scope="function") async def random_tables(request, manager): rf_marker = request.node.get_closest_marker("replication_factor") replication_factor = rf_marker.args[0] if rf_marker is not None else 3 # Default 3 enable_tablets = request.node.get_closest_marker("enable_tablets") enable_tablets = enable_tablets.args[0] if enable_tablets is not None else None tables = RandomTables(request.node.name, manager, unique_name(), replication_factor, None, enable_tablets) yield tables # Don't drop tables at the end if we failed or the cluster is dirty - it may be impossible # (e.g. the cluster is completely dead) and it doesn't matter (we won't reuse the cluster # anyway). # The cluster will be marked as dirty if the test failed, but that happens # at the end of `manager` fixture which we depend on (so these steps will be # executed after us) - so at this point, we need to check for failure ourselves too. report = request.node.stash[PHASE_REPORT_KEY] failed = report.when == "call" and report.failed if not failed and not await manager.is_dirty(): tables.drop_all() @pytest.fixture(scope="function", autouse=True) async def prepare_3_nodes_cluster(request, manager): if request.node.get_closest_marker("prepare_3_nodes_cluster"): await manager.servers_add(3) @pytest.fixture(scope="function", autouse=True) async def prepare_3_racks_cluster(request, manager): if request.node.get_closest_marker("prepare_3_racks_cluster"): await manager.servers_add(3, auto_rack_dc="dc1") @pytest.fixture(scope="function") def internet_dependency_enabled(request) -> None: if request.config.getoption('skip_internet_dependent_tests'): pytest.skip(reason="skip_internet_dependent_tests is set") @pytest.fixture(scope="function") async def scylla_2025_1(request, build_mode, internet_dependency_enabled) -> AsyncIterator[ScyllaVersionDescription]: yield await get_scylla_2025_1_description(build_mode) @pytest.fixture(scope="function", params=list(KeyProvider)) async def key_provider(request, tmpdir, scylla_binary): """Encryption providers fixture""" async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res: yield res