Replace direct usage of SCYLLA environment variable with the build_mode pytest fixture and path_to helper function. This makes tests more flexible and consistent with the test framework. Also this allows to use tests with xdist, where environment variable can be left in the master process and will not be set in the workers Add using the fixture to get the scylla binary from the suite, this will align with getting relocatable Scylla exe.
392 lines
18 KiB
Python
392 lines
18 KiB
Python
#
|
|
# Copyright (C) 2022-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
# This file configures pytest for all tests in this directory, and also
|
|
# defines common test fixtures for all of them to use
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import ssl
|
|
import tempfile
|
|
import platform
|
|
import urllib.parse
|
|
import warnings
|
|
from concurrent.futures.thread import ThreadPoolExecutor
|
|
from multiprocessing import Event
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from test import TOP_SRC_DIR, path_to
|
|
from test.pylib.runner import testpy_test_fixture_scope
|
|
from test.pylib.random_tables import RandomTables
|
|
from test.pylib.util import unique_name
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.async_cql import run_async
|
|
from test.pylib.scylla_cluster import ScyllaClusterManager, ScyllaVersionDescription, get_scylla_2025_1_description
|
|
from test.pylib.suite.base import get_testpy_test
|
|
from test.pylib.suite.python import add_cql_connection_options
|
|
from test.pylib.encryption_provider import KeyProvider, make_key_provider_factory
|
|
import logging
|
|
import pytest
|
|
from cassandra.auth import PlainTextAuthProvider # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.cluster import Session # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.cluster import Cluster, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.policies import ExponentialReconnectionPolicy # type: ignore
|
|
from cassandra.policies import RoundRobinPolicy # type: ignore
|
|
from cassandra.policies import TokenAwarePolicy # type: ignore
|
|
from cassandra.policies import WhiteListRoundRobinPolicy # type: ignore
|
|
from cassandra.connection import DRIVER_NAME # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.connection import DRIVER_VERSION # type: ignore # pylint: disable=no-name-in-module
|
|
from collections.abc import AsyncIterator
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import AsyncGenerator
|
|
from typing import Callable
|
|
|
|
from cassandra.connection import EndPoint
|
|
|
|
from test.pylib.internal_types import IPAddress
|
|
from test.pylib.suite.base import Test
|
|
|
|
|
|
Session.run_async = run_async # patch Session for convenience
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
|
|
|
|
|
|
async def decode_backtrace(build_mode: str, input: str):
|
|
executable = Path(path_to(build_mode, "scylla"))
|
|
proc = await asyncio.create_subprocess_exec(
|
|
(TOP_SRC_DIR / "seastar" / "scripts" / "seastar-addr2line").absolute(),
|
|
"-e",
|
|
executable.absolute(),
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate(input=input.encode())
|
|
return f"{stdout.decode()}\n{stderr.decode()}"
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
parser.addoption('--manager-api', action='store',
|
|
help='Manager unix socket path')
|
|
add_cql_connection_options(parser)
|
|
parser.addoption('--skip-internet-dependent-tests', action='store_true', default=False,
|
|
help='Skip tests which depend on artifacts from the internet')
|
|
parser.addoption('--artifacts_dir_url', action='store', type=str, default=None, dest='artifacts_dir_url',
|
|
help='Provide the URL to artifacts directory to generate the link to failed tests directory '
|
|
'with logs')
|
|
|
|
|
|
# This is a constant used in `pytest_runtest_makereport` below to store the full report for the test case
|
|
# in a stash which can then be accessed from fixtures to print the stacktrace for the failed test
|
|
PHASE_REPORT_KEY = pytest.StashKey[dict[str, pytest.CollectReport]]()
|
|
|
|
|
|
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
|
def pytest_runtest_makereport(item, call):
|
|
"""This is a post-test hook executed by the pytest library.
|
|
Use it to access the test result and store a flag indicating failure
|
|
so we can later retrieve it in our fixtures like `manager`.
|
|
|
|
`item.stash` is the same stash as `request.node.stash` (in the `request`
|
|
fixture provided by pytest).
|
|
"""
|
|
outcome = yield
|
|
report = outcome.get_result()
|
|
|
|
item.stash[PHASE_REPORT_KEY] = report
|
|
|
|
|
|
conn_logger = logging.getLogger("conn_messages")
|
|
conn_logger.setLevel(logging.INFO)
|
|
|
|
class CustomConnection(Cluster.connection_class):
|
|
def send_msg(self, *args, **argv):
|
|
conn_logger.debug(f"send_msg: ({id(self)}): {args} {argv}")
|
|
return super(CustomConnection, self).send_msg(*args, **argv)
|
|
|
|
def process_msg(self, msg, protocol_version):
|
|
conn_logger.debug(f"process_msg: ({id(self)}): {msg}")
|
|
return super(CustomConnection, self).process_msg(msg, protocol_version)
|
|
|
|
|
|
# cluster_con helper: set up client object for communicating with the CQL API.
|
|
def cluster_con(hosts: list[IPAddress | EndPoint], port: int = 9042, use_ssl: bool = False, auth_provider=None,
|
|
load_balancing_policy=RoundRobinPolicy()):
|
|
"""Create a CQL Cluster connection object according to configuration.
|
|
It does not .connect() yet."""
|
|
assert len(hosts) > 0, "python driver connection needs at least one host to connect to"
|
|
profile = ExecutionProfile(
|
|
load_balancing_policy=load_balancing_policy,
|
|
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
|
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
|
|
# The default timeouts should have been more than enough, but in some
|
|
# extreme cases with a very slow debug build running on a slow or very busy
|
|
# machine, they may not be. Observed tests reach 160 seconds. So it's
|
|
# incremented to 200 seconds.
|
|
# See issue #11289.
|
|
# NOTE: request_timeout is the main cause of timeouts, even if logs say heartbeat
|
|
request_timeout=200)
|
|
whitelist_profile = ExecutionProfile(
|
|
load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)),
|
|
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
|
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
|
|
request_timeout=200)
|
|
if use_ssl:
|
|
# Scylla does not support any earlier TLS protocol. If you try,
|
|
# you will get mysterious EOF errors (see issue #6971) :-(
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
else:
|
|
ssl_context = None
|
|
|
|
return Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile, 'whitelist': whitelist_profile},
|
|
contact_points=hosts,
|
|
port=port,
|
|
# TODO: make the protocol version an option, to allow testing with
|
|
# different versions. If we drop this setting completely, it will
|
|
# mean pick the latest version supported by the client and the server.
|
|
protocol_version=4,
|
|
# NOTE: No auth provider as auth keysppace has RF=1 and topology will take
|
|
# down nodes, causing errors. If auth is needed in the future for topology
|
|
# tests, they should bump up auth RF and run repair.
|
|
ssl_context=ssl_context,
|
|
# The default timeouts should have been more than enough, but in some
|
|
# extreme cases with a very slow debug build running on a slow or very busy
|
|
# machine, they may not be. Observed tests reach 160 seconds. So it's
|
|
# incremented to 200 seconds.
|
|
# See issue #11289.
|
|
connect_timeout = 200,
|
|
control_connection_timeout = 200,
|
|
# NOTE: max_schema_agreement_wait must be 2x or 3x smaller than request_timeout
|
|
# else the driver can't handle a server being down
|
|
max_schema_agreement_wait=20,
|
|
idle_heartbeat_timeout=200,
|
|
# The default reconnection policy has a large maximum interval
|
|
# between retries (600 seconds). In tests that restart/replace nodes,
|
|
# where a node can be unavailable for an extended period of time,
|
|
# this can cause the reconnection retry interval to get very large,
|
|
# longer than a test timeout.
|
|
reconnection_policy = ExponentialReconnectionPolicy(1.0, 4.0),
|
|
|
|
auth_provider=auth_provider,
|
|
# Capture messages for debugging purposes.
|
|
connection_class=CustomConnection
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
async def manager_api_sock_path(request: pytest.FixtureRequest, testpy_test: Test | None) -> AsyncGenerator[str]:
|
|
if testpy_test is None:
|
|
yield request.config.getoption("--manager-api")
|
|
else:
|
|
test_uname = testpy_test.uname
|
|
clusters = testpy_test.suite.clusters
|
|
base_dir = str(testpy_test.suite.log_dir)
|
|
sock_path = f"{tempfile.mkdtemp(prefix='manager-', dir='/tmp')}/api"
|
|
|
|
start_event = Event()
|
|
stop_event = Event()
|
|
|
|
async def run_manager() -> None:
|
|
mgr = ScyllaClusterManager(test_uname=test_uname, clusters=clusters, base_dir=base_dir, sock_path=sock_path)
|
|
await mgr.start()
|
|
start_event.set()
|
|
try:
|
|
await asyncio.get_running_loop().run_in_executor(None, stop_event.wait)
|
|
finally:
|
|
await mgr.stop()
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(asyncio.run, run_manager())
|
|
start_event.wait()
|
|
|
|
yield sock_path
|
|
|
|
stop_event.set()
|
|
future.result()
|
|
|
|
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
async def manager_internal(request: pytest.FixtureRequest, manager_api_sock_path: str) -> Callable[[], ManagerClient]:
|
|
"""Session fixture to prepare client object for communicating with the Cluster API.
|
|
Pass the Unix socket path where the Manager server API is listening.
|
|
Pass a function to create driver connections.
|
|
Test cases (functions) should not use this fixture.
|
|
"""
|
|
port = int(request.config.getoption('port'))
|
|
use_ssl = bool(request.config.getoption('ssl'))
|
|
auth_username = request.config.getoption('auth_username', default=None)
|
|
auth_password = request.config.getoption('auth_password', default=None)
|
|
if auth_username is not None and auth_password is not None:
|
|
auth_provider = PlainTextAuthProvider(username=auth_username, password=auth_password)
|
|
else:
|
|
auth_provider = None
|
|
return lambda: ManagerClient(
|
|
sock_path=manager_api_sock_path,
|
|
port=port,
|
|
use_ssl=use_ssl,
|
|
auth_provider=auth_provider,
|
|
con_gen=cluster_con,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
async def manager(request: pytest.FixtureRequest,
|
|
manager_internal: Callable[[], ManagerClient],
|
|
record_property: Callable[[str, object], None],
|
|
build_mode: str) -> AsyncGenerator[ManagerClient]:
|
|
"""
|
|
Per test fixture to notify Manager client object when tests begin so it can perform checks for cluster state.
|
|
"""
|
|
testpy_test = await get_testpy_test(path=request.path, options=request.config.option, mode=build_mode)
|
|
test_case_name = request.node.name
|
|
suite_testpy_log = testpy_test.log_filename
|
|
test_log = suite_testpy_log.parent / f"{Path(suite_testpy_log.stem).stem}.{test_case_name}.log"
|
|
# this should be consistent with scylla_cluster.py handler name in _before_test method
|
|
test_py_log_test = suite_testpy_log.parent / f"{test_log.stem}_cluster.log"
|
|
|
|
manager_client = manager_internal() # set up client object in fixture with scope function
|
|
await manager_client.before_test(test_case_name, test_log)
|
|
yield manager_client
|
|
# `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve
|
|
# test failure.
|
|
report = request.node.stash[PHASE_REPORT_KEY]
|
|
failed = report.when == "call" and report.failed
|
|
|
|
# Check if the test has the check_nodes_for_errors marker
|
|
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
|
|
|
|
failed_test_dir_path = None
|
|
if failed or found_errors:
|
|
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
|
|
# all related logs in one dir.
|
|
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
|
|
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
|
|
str.maketrans('[]', '()'))
|
|
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
if failed:
|
|
await manager_client.gather_related_logs(
|
|
failed_test_dir_path,
|
|
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
|
|
)
|
|
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
|
|
f.write(report.longreprtext)
|
|
if request.config.getoption('artifacts_dir_url') is not None:
|
|
# get the relative path to the tmpdir for the failed directory
|
|
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
|
|
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
|
|
urllib.parse.quote(dir_path_relative))
|
|
record_property("TEST_LOGS", full_url)
|
|
|
|
cluster_status = await manager_client.after_test(test_case_name, not failed)
|
|
await manager_client.stop() # Stop client session and close driver after each test
|
|
|
|
if cluster_status["server_broken"] and not failed:
|
|
failed = True
|
|
pytest.fail(
|
|
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"
|
|
f" server_broken_reason: {cluster_status["message"]}"
|
|
)
|
|
if found_errors:
|
|
full_message = []
|
|
for server, data in found_errors.items():
|
|
summary = []
|
|
detailed = []
|
|
|
|
if criticals := data.get("critical", []):
|
|
summary.append(f"{len(criticals)} critical error(s)")
|
|
detailed.extend(map(str.rstrip, criticals))
|
|
|
|
if backtraces := data.get("backtraces", []):
|
|
summary.append(f"{len(backtraces)} backtrace(s)")
|
|
with open(failed_test_dir_path / f"scylla-{server.server_id}-backtraces.txt", "w") as bt_file:
|
|
for backtrace in backtraces:
|
|
bt_file.write(backtrace + "\n\n")
|
|
decoded_bt = await decode_backtrace(build_mode, backtrace)
|
|
bt_file.write(decoded_bt + "\n\n")
|
|
detailed.append(f"{len(backtraces)} backtrace(s) saved in {Path(bt_file.name).name}")
|
|
|
|
if errors := data.get("error", []):
|
|
summary.append(f"{len(errors)} error(s)")
|
|
detailed.extend(map(str.rstrip, errors))
|
|
|
|
if cores := data.get("cores", []):
|
|
summary.append(f"{len(cores)} core(s): {', '.join(cores)}")
|
|
|
|
if summary:
|
|
summary_line = f"Server {server.server_id}: found {', '.join(summary)} (log: { data['log']})"
|
|
detailed = [f" {line}" for line in detailed]
|
|
full_message.append(summary_line)
|
|
full_message.extend(detailed)
|
|
|
|
with open(failed_test_dir_path / "found_errors.txt", "w") as f:
|
|
f.write("\n".join(full_message))
|
|
if not failed:
|
|
pytest.fail(f"\n{'\n'.join(full_message)}")
|
|
|
|
# "cql" fixture: set up client object for communicating with the CQL API.
|
|
# Since connection is managed by manager just return that object
|
|
@pytest.fixture(scope="function")
|
|
def cql(manager):
|
|
yield manager.cql
|
|
|
|
# "random_tables" fixture: Creates and returns a temporary RandomTables object
|
|
# used in tests to make schema changes. Tables are dropped after test finishes
|
|
# unless the cluster is dirty or the test has failed.
|
|
@pytest.fixture(scope="function")
|
|
async def random_tables(request, manager):
|
|
rf_marker = request.node.get_closest_marker("replication_factor")
|
|
replication_factor = rf_marker.args[0] if rf_marker is not None else 3 # Default 3
|
|
enable_tablets = request.node.get_closest_marker("enable_tablets")
|
|
enable_tablets = enable_tablets.args[0] if enable_tablets is not None else None
|
|
tables = RandomTables(request.node.name, manager, unique_name(),
|
|
replication_factor, None, enable_tablets)
|
|
yield tables
|
|
|
|
# Don't drop tables at the end if we failed or the cluster is dirty - it may be impossible
|
|
# (e.g. the cluster is completely dead) and it doesn't matter (we won't reuse the cluster
|
|
# anyway).
|
|
# The cluster will be marked as dirty if the test failed, but that happens
|
|
# at the end of `manager` fixture which we depend on (so these steps will be
|
|
# executed after us) - so at this point, we need to check for failure ourselves too.
|
|
report = request.node.stash[PHASE_REPORT_KEY]
|
|
failed = report.when == "call" and report.failed
|
|
if not failed and not await manager.is_dirty():
|
|
tables.drop_all()
|
|
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
async def prepare_3_nodes_cluster(request, manager):
|
|
if request.node.get_closest_marker("prepare_3_nodes_cluster"):
|
|
await manager.servers_add(3)
|
|
|
|
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
async def prepare_3_racks_cluster(request, manager):
|
|
if request.node.get_closest_marker("prepare_3_racks_cluster"):
|
|
await manager.servers_add(3, auto_rack_dc="dc1")
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def internet_dependency_enabled(request) -> None:
|
|
if request.config.getoption('skip_internet_dependent_tests'):
|
|
pytest.skip(reason="skip_internet_dependent_tests is set")
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
async def scylla_2025_1(request, build_mode, internet_dependency_enabled) -> AsyncIterator[ScyllaVersionDescription]:
|
|
yield await get_scylla_2025_1_description(build_mode)
|
|
|
|
@pytest.fixture(scope="function", params=list(KeyProvider))
|
|
async def key_provider(request, tmpdir, scylla_binary):
|
|
"""Encryption providers fixture"""
|
|
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
|
|
yield res
|