Files
scylladb/test/cluster/conftest.py
Andrei Chekun 99234f0a83 test.py: replace SCYLLA env var with build_mode fixture
Replace direct usage of SCYLLA environment variable with the build_mode
pytest fixture and path_to helper function. This makes tests more
flexible and consistent with the test framework. Also this allows to use
tests with xdist, where environment variable can be left in the master
process and will not be set in the workers
Add using the fixture to get the scylla binary from the suite, this will
align with getting relocatable Scylla exe.
2026-02-24 09:48:38 +01:00

392 lines
18 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
# This file configures pytest for all tests in this directory, and also
# defines common test fixtures for all of them to use
from __future__ import annotations
import asyncio
import ssl
import tempfile
import platform
import urllib.parse
import warnings
from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, path_to
from test.pylib.runner import testpy_test_fixture_scope
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name
from test.pylib.manager_client import ManagerClient
from test.pylib.async_cql import run_async
from test.pylib.scylla_cluster import ScyllaClusterManager, ScyllaVersionDescription, get_scylla_2025_1_description
from test.pylib.suite.base import get_testpy_test
from test.pylib.suite.python import add_cql_connection_options
from test.pylib.encryption_provider import KeyProvider, make_key_provider_factory
import logging
import pytest
from cassandra.auth import PlainTextAuthProvider # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import Session # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import Cluster, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore # pylint: disable=no-name-in-module
from cassandra.policies import ExponentialReconnectionPolicy # type: ignore
from cassandra.policies import RoundRobinPolicy # type: ignore
from cassandra.policies import TokenAwarePolicy # type: ignore
from cassandra.policies import WhiteListRoundRobinPolicy # type: ignore
from cassandra.connection import DRIVER_NAME # type: ignore # pylint: disable=no-name-in-module
from cassandra.connection import DRIVER_VERSION # type: ignore # pylint: disable=no-name-in-module
from collections.abc import AsyncIterator
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from typing import Callable
from cassandra.connection import EndPoint
from test.pylib.internal_types import IPAddress
from test.pylib.suite.base import Test
Session.run_async = run_async # patch Session for convenience
logger = logging.getLogger(__name__)
print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
async def decode_backtrace(build_mode: str, input: str):
executable = Path(path_to(build_mode, "scylla"))
proc = await asyncio.create_subprocess_exec(
(TOP_SRC_DIR / "seastar" / "scripts" / "seastar-addr2line").absolute(),
"-e",
executable.absolute(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(input=input.encode())
return f"{stdout.decode()}\n{stderr.decode()}"
def pytest_addoption(parser):
parser.addoption('--manager-api', action='store',
help='Manager unix socket path')
add_cql_connection_options(parser)
parser.addoption('--skip-internet-dependent-tests', action='store_true', default=False,
help='Skip tests which depend on artifacts from the internet')
parser.addoption('--artifacts_dir_url', action='store', type=str, default=None, dest='artifacts_dir_url',
help='Provide the URL to artifacts directory to generate the link to failed tests directory '
'with logs')
# This is a constant used in `pytest_runtest_makereport` below to store the full report for the test case
# in a stash which can then be accessed from fixtures to print the stacktrace for the failed test
PHASE_REPORT_KEY = pytest.StashKey[dict[str, pytest.CollectReport]]()
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""This is a post-test hook executed by the pytest library.
Use it to access the test result and store a flag indicating failure
so we can later retrieve it in our fixtures like `manager`.
`item.stash` is the same stash as `request.node.stash` (in the `request`
fixture provided by pytest).
"""
outcome = yield
report = outcome.get_result()
item.stash[PHASE_REPORT_KEY] = report
conn_logger = logging.getLogger("conn_messages")
conn_logger.setLevel(logging.INFO)
class CustomConnection(Cluster.connection_class):
def send_msg(self, *args, **argv):
conn_logger.debug(f"send_msg: ({id(self)}): {args} {argv}")
return super(CustomConnection, self).send_msg(*args, **argv)
def process_msg(self, msg, protocol_version):
conn_logger.debug(f"process_msg: ({id(self)}): {msg}")
return super(CustomConnection, self).process_msg(msg, protocol_version)
# cluster_con helper: set up client object for communicating with the CQL API.
def cluster_con(hosts: list[IPAddress | EndPoint], port: int = 9042, use_ssl: bool = False, auth_provider=None,
load_balancing_policy=RoundRobinPolicy()):
"""Create a CQL Cluster connection object according to configuration.
It does not .connect() yet."""
assert len(hosts) > 0, "python driver connection needs at least one host to connect to"
profile = ExecutionProfile(
load_balancing_policy=load_balancing_policy,
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
# The default timeouts should have been more than enough, but in some
# extreme cases with a very slow debug build running on a slow or very busy
# machine, they may not be. Observed tests reach 160 seconds. So it's
# incremented to 200 seconds.
# See issue #11289.
# NOTE: request_timeout is the main cause of timeouts, even if logs say heartbeat
request_timeout=200)
whitelist_profile = ExecutionProfile(
load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)),
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
request_timeout=200)
if use_ssl:
# Scylla does not support any earlier TLS protocol. If you try,
# you will get mysterious EOF errors (see issue #6971) :-(
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
else:
ssl_context = None
return Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile, 'whitelist': whitelist_profile},
contact_points=hosts,
port=port,
# TODO: make the protocol version an option, to allow testing with
# different versions. If we drop this setting completely, it will
# mean pick the latest version supported by the client and the server.
protocol_version=4,
# NOTE: No auth provider as auth keysppace has RF=1 and topology will take
# down nodes, causing errors. If auth is needed in the future for topology
# tests, they should bump up auth RF and run repair.
ssl_context=ssl_context,
# The default timeouts should have been more than enough, but in some
# extreme cases with a very slow debug build running on a slow or very busy
# machine, they may not be. Observed tests reach 160 seconds. So it's
# incremented to 200 seconds.
# See issue #11289.
connect_timeout = 200,
control_connection_timeout = 200,
# NOTE: max_schema_agreement_wait must be 2x or 3x smaller than request_timeout
# else the driver can't handle a server being down
max_schema_agreement_wait=20,
idle_heartbeat_timeout=200,
# The default reconnection policy has a large maximum interval
# between retries (600 seconds). In tests that restart/replace nodes,
# where a node can be unavailable for an extended period of time,
# this can cause the reconnection retry interval to get very large,
# longer than a test timeout.
reconnection_policy = ExponentialReconnectionPolicy(1.0, 4.0),
auth_provider=auth_provider,
# Capture messages for debugging purposes.
connection_class=CustomConnection
)
@pytest.fixture(scope=testpy_test_fixture_scope)
async def manager_api_sock_path(request: pytest.FixtureRequest, testpy_test: Test | None) -> AsyncGenerator[str]:
if testpy_test is None:
yield request.config.getoption("--manager-api")
else:
test_uname = testpy_test.uname
clusters = testpy_test.suite.clusters
base_dir = str(testpy_test.suite.log_dir)
sock_path = f"{tempfile.mkdtemp(prefix='manager-', dir='/tmp')}/api"
start_event = Event()
stop_event = Event()
async def run_manager() -> None:
mgr = ScyllaClusterManager(test_uname=test_uname, clusters=clusters, base_dir=base_dir, sock_path=sock_path)
await mgr.start()
start_event.set()
try:
await asyncio.get_running_loop().run_in_executor(None, stop_event.wait)
finally:
await mgr.stop()
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, run_manager())
start_event.wait()
yield sock_path
stop_event.set()
future.result()
@pytest.fixture(scope=testpy_test_fixture_scope)
async def manager_internal(request: pytest.FixtureRequest, manager_api_sock_path: str) -> Callable[[], ManagerClient]:
"""Session fixture to prepare client object for communicating with the Cluster API.
Pass the Unix socket path where the Manager server API is listening.
Pass a function to create driver connections.
Test cases (functions) should not use this fixture.
"""
port = int(request.config.getoption('port'))
use_ssl = bool(request.config.getoption('ssl'))
auth_username = request.config.getoption('auth_username', default=None)
auth_password = request.config.getoption('auth_password', default=None)
if auth_username is not None and auth_password is not None:
auth_provider = PlainTextAuthProvider(username=auth_username, password=auth_password)
else:
auth_provider = None
return lambda: ManagerClient(
sock_path=manager_api_sock_path,
port=port,
use_ssl=use_ssl,
auth_provider=auth_provider,
con_gen=cluster_con,
)
@pytest.fixture(scope="function")
async def manager(request: pytest.FixtureRequest,
manager_internal: Callable[[], ManagerClient],
record_property: Callable[[str, object], None],
build_mode: str) -> AsyncGenerator[ManagerClient]:
"""
Per test fixture to notify Manager client object when tests begin so it can perform checks for cluster state.
"""
testpy_test = await get_testpy_test(path=request.path, options=request.config.option, mode=build_mode)
test_case_name = request.node.name
suite_testpy_log = testpy_test.log_filename
test_log = suite_testpy_log.parent / f"{Path(suite_testpy_log.stem).stem}.{test_case_name}.log"
# this should be consistent with scylla_cluster.py handler name in _before_test method
test_py_log_test = suite_testpy_log.parent / f"{test_log.stem}_cluster.log"
manager_client = manager_internal() # set up client object in fixture with scope function
await manager_client.before_test(test_case_name, test_log)
yield manager_client
# `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve
# test failure.
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
failed_test_dir_path = None
if failed or found_errors:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
str.maketrans('[]', '()'))
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
if failed:
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
urllib.parse.quote(dir_path_relative))
record_property("TEST_LOGS", full_url)
cluster_status = await manager_client.after_test(test_case_name, not failed)
await manager_client.stop() # Stop client session and close driver after each test
if cluster_status["server_broken"] and not failed:
failed = True
pytest.fail(
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"
f" server_broken_reason: {cluster_status["message"]}"
)
if found_errors:
full_message = []
for server, data in found_errors.items():
summary = []
detailed = []
if criticals := data.get("critical", []):
summary.append(f"{len(criticals)} critical error(s)")
detailed.extend(map(str.rstrip, criticals))
if backtraces := data.get("backtraces", []):
summary.append(f"{len(backtraces)} backtrace(s)")
with open(failed_test_dir_path / f"scylla-{server.server_id}-backtraces.txt", "w") as bt_file:
for backtrace in backtraces:
bt_file.write(backtrace + "\n\n")
decoded_bt = await decode_backtrace(build_mode, backtrace)
bt_file.write(decoded_bt + "\n\n")
detailed.append(f"{len(backtraces)} backtrace(s) saved in {Path(bt_file.name).name}")
if errors := data.get("error", []):
summary.append(f"{len(errors)} error(s)")
detailed.extend(map(str.rstrip, errors))
if cores := data.get("cores", []):
summary.append(f"{len(cores)} core(s): {', '.join(cores)}")
if summary:
summary_line = f"Server {server.server_id}: found {', '.join(summary)} (log: { data['log']})"
detailed = [f" {line}" for line in detailed]
full_message.append(summary_line)
full_message.extend(detailed)
with open(failed_test_dir_path / "found_errors.txt", "w") as f:
f.write("\n".join(full_message))
if not failed:
pytest.fail(f"\n{'\n'.join(full_message)}")
# "cql" fixture: set up client object for communicating with the CQL API.
# Since connection is managed by manager just return that object
@pytest.fixture(scope="function")
def cql(manager):
yield manager.cql
# "random_tables" fixture: Creates and returns a temporary RandomTables object
# used in tests to make schema changes. Tables are dropped after test finishes
# unless the cluster is dirty or the test has failed.
@pytest.fixture(scope="function")
async def random_tables(request, manager):
rf_marker = request.node.get_closest_marker("replication_factor")
replication_factor = rf_marker.args[0] if rf_marker is not None else 3 # Default 3
enable_tablets = request.node.get_closest_marker("enable_tablets")
enable_tablets = enable_tablets.args[0] if enable_tablets is not None else None
tables = RandomTables(request.node.name, manager, unique_name(),
replication_factor, None, enable_tablets)
yield tables
# Don't drop tables at the end if we failed or the cluster is dirty - it may be impossible
# (e.g. the cluster is completely dead) and it doesn't matter (we won't reuse the cluster
# anyway).
# The cluster will be marked as dirty if the test failed, but that happens
# at the end of `manager` fixture which we depend on (so these steps will be
# executed after us) - so at this point, we need to check for failure ourselves too.
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
if not failed and not await manager.is_dirty():
tables.drop_all()
@pytest.fixture(scope="function", autouse=True)
async def prepare_3_nodes_cluster(request, manager):
if request.node.get_closest_marker("prepare_3_nodes_cluster"):
await manager.servers_add(3)
@pytest.fixture(scope="function", autouse=True)
async def prepare_3_racks_cluster(request, manager):
if request.node.get_closest_marker("prepare_3_racks_cluster"):
await manager.servers_add(3, auto_rack_dc="dc1")
@pytest.fixture(scope="function")
def internet_dependency_enabled(request) -> None:
if request.config.getoption('skip_internet_dependent_tests'):
pytest.skip(reason="skip_internet_dependent_tests is set")
@pytest.fixture(scope="function")
async def scylla_2025_1(request, build_mode, internet_dependency_enabled) -> AsyncIterator[ScyllaVersionDescription]:
yield await get_scylla_2025_1_description(build_mode)
@pytest.fixture(scope="function", params=list(KeyProvider))
async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res