1. Remove dumping cluster logs and print only the link to the log. 2. Fail the test (to fail CI and not ignore the problem) and mark the cluster as dirty (to avoid affecting subsequent tests) in case setup/teardown fails. 3. Add 2 cqlpy tests that fail after applying step 2 to the dirties_cluster list so the cluster is discarded afterward. Closes scylladb/scylladb#26183
304 lines
13 KiB
Python
304 lines
13 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
import collections
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shlex
|
|
from contextlib import asynccontextmanager
|
|
from functools import cache
|
|
from typing import TYPE_CHECKING
|
|
|
|
from scripts import coverage
|
|
from test import path_to
|
|
from test.pylib.pool import Pool
|
|
from test.pylib.scylla_cluster import ScyllaCluster, ScyllaServer, merge_cmdline_options, get_current_version_description
|
|
from test.pylib.suite.base import Test, TestSuite, read_log, run_test
|
|
from test.pylib.util import LogPrefixAdapter
|
|
|
|
if TYPE_CHECKING:
|
|
import argparse
|
|
from collections.abc import Callable, Awaitable, AsyncGenerator
|
|
from typing import Optional, Union
|
|
|
|
from pytest import Parser
|
|
|
|
|
|
class PythonTestSuite(TestSuite):
|
|
"""A collection of Python pytests against a single Scylla instance"""
|
|
|
|
test_file_ext = ".py"
|
|
|
|
def __init__(self, path, cfg: dict, options: argparse.Namespace, mode: str) -> None:
|
|
super().__init__(path, cfg, options, mode)
|
|
self.scylla_exe = path_to(self.mode, "scylla")
|
|
self.scylla_env = dict(self.base_env)
|
|
if self.mode == "coverage":
|
|
self.scylla_env.update(coverage.env(self.scylla_exe, distinct_id=self.name))
|
|
self.scylla_env['SCYLLA'] = self.scylla_exe
|
|
|
|
cluster_cfg = self.cfg.get("cluster", {"initial_size": 1})
|
|
cluster_size = cluster_cfg["initial_size"]
|
|
env_pool_size = os.getenv("CLUSTER_POOL_SIZE")
|
|
if options.cluster_pool_size is not None:
|
|
pool_size = options.cluster_pool_size
|
|
elif env_pool_size is not None:
|
|
pool_size = int(env_pool_size)
|
|
else:
|
|
pool_size = cfg.get("pool_size", 2)
|
|
self.dirties_cluster = set(cfg.get("dirties_cluster", []))
|
|
|
|
self.create_cluster = self.get_cluster_factory(cluster_size, options)
|
|
async def recycle_cluster(cluster: ScyllaCluster) -> None:
|
|
"""When a dirty cluster is returned to the cluster pool,
|
|
stop it and release the used IPs. We don't necessarily uninstall() it yet,
|
|
which would delete the log file and directory - we might want to preserve
|
|
these if it came from a failed test.
|
|
"""
|
|
for srv in cluster.servers.values():
|
|
if srv.log_file is not None:
|
|
srv.log_file.close()
|
|
srv.maintenance_socket_dir.cleanup()
|
|
await cluster.stop()
|
|
await cluster.release_ips()
|
|
|
|
self.clusters = Pool(pool_size, self.create_cluster, recycle_cluster)
|
|
|
|
def get_cluster_factory(self, cluster_size: int, options: argparse.Namespace) -> Callable[..., Awaitable]:
|
|
def create_server(create_cfg: ScyllaCluster.CreateServerParams):
|
|
cmdline_options = self.cfg.get("extra_scylla_cmdline_options", [])
|
|
if type(cmdline_options) == str:
|
|
cmdline_options = [cmdline_options]
|
|
cmdline_options = merge_cmdline_options(cmdline_options, create_cfg.cmdline_from_test)
|
|
cmdline_options = merge_cmdline_options(cmdline_options, options.extra_scylla_cmdline_options)
|
|
# There are multiple sources of config options, with increasing priority
|
|
# (if two sources provide the same config option, the higher priority one wins):
|
|
# 1. the defaults
|
|
# 2. suite-specific config options (in "extra_scylla_config_options")
|
|
# 3. config options from tests (when servers are added during a test)
|
|
default_config_options = \
|
|
{"authenticator": "PasswordAuthenticator",
|
|
"authorizer": "CassandraAuthorizer"}
|
|
default_config_options["tablets_initial_scale_factor"] = 4 if self.mode == "release" else 2
|
|
config_options = default_config_options | \
|
|
self.cfg.get("extra_scylla_config_options", {}) | \
|
|
create_cfg.config_from_test
|
|
|
|
server = ScyllaServer(
|
|
mode=self.mode,
|
|
version=(create_cfg.version or get_current_version_description(self.scylla_exe)),
|
|
vardir=self.log_dir,
|
|
logger=create_cfg.logger,
|
|
cluster_name=create_cfg.cluster_name,
|
|
ip_addr=create_cfg.ip_addr,
|
|
seeds=create_cfg.seeds,
|
|
cmdline_options=cmdline_options,
|
|
config_options=config_options,
|
|
property_file=create_cfg.property_file,
|
|
append_env=self.base_env,
|
|
server_encryption=create_cfg.server_encryption)
|
|
|
|
return server
|
|
|
|
async def create_cluster(logger: Union[logging.Logger, logging.LoggerAdapter]) -> ScyllaCluster:
|
|
cluster = ScyllaCluster(logger, self.hosts, cluster_size, create_server)
|
|
|
|
async def stop() -> None:
|
|
await cluster.stop()
|
|
|
|
# Suite artifacts are removed when
|
|
# the entire suite ends successfully.
|
|
self.artifacts.add_suite_artifact(self, stop)
|
|
if not self.options.save_log_on_success:
|
|
# If a test fails, we might want to keep the data dirs.
|
|
async def uninstall() -> None:
|
|
await cluster.uninstall()
|
|
|
|
self.artifacts.add_suite_artifact(self, uninstall)
|
|
self.artifacts.add_exit_artifact(self, stop)
|
|
|
|
await cluster.install_and_start()
|
|
return cluster
|
|
|
|
return create_cluster
|
|
|
|
@property
|
|
def pattern(self) -> str | list[str]:
|
|
return ["*_test.py", "*_tests.py", "test_*.py"]
|
|
|
|
async def add_test(self, shortname, casename) -> None:
|
|
test = PythonTest(self.next_id((shortname, self.suite_key)), shortname, casename, self)
|
|
self.tests.append(test)
|
|
|
|
async def run(self, test: 'Test', options: argparse.Namespace):
|
|
if not os.access(self.scylla_exe, os.F_OK):
|
|
raise FileNotFoundError(f"{self.scylla_exe} does not exist.")
|
|
if not os.access(self.scylla_exe, os.X_OK):
|
|
raise PermissionError(f"{self.scylla_exe} is not executable.")
|
|
return await super().run(test, options)
|
|
|
|
|
|
class PythonTest(Test):
|
|
"""Run a pytest collection of cases against a standalone Scylla"""
|
|
|
|
def __init__(self, test_no: int, shortname: str, casename: str, suite) -> None:
|
|
super().__init__(test_no, shortname, suite)
|
|
self.path = "python"
|
|
self.core_args = ["-m", "pytest"]
|
|
self.casename = casename
|
|
self.xmlout = self.suite.log_dir / "xml" / f"{self.uname}.xunit.xml"
|
|
self.server_address: str | None = None
|
|
self.server_log: Optional[str] = None
|
|
self.server_log_filename: Optional[pathlib.Path] = None
|
|
self.is_before_test_ok = False
|
|
self.is_after_test_ok = False
|
|
|
|
def _prepare_pytest_params(self, options: argparse.Namespace):
|
|
self.args = [
|
|
"-s", # don't capture print() output inside pytest
|
|
"--log-level=DEBUG", # Capture logs
|
|
"-vv",
|
|
"-o",
|
|
"junit_family=xunit2",
|
|
"-o",
|
|
"junit_suite_name={}".format(self.suite.name),
|
|
"--junit-xml={}".format(self.xmlout),
|
|
"-rs",
|
|
"--run_id={}".format(self.id),
|
|
"--mode={}".format(self.mode),
|
|
"--tmpdir={}".format(options.tmpdir),
|
|
]
|
|
if options.gather_metrics:
|
|
self.args.append("--gather-metrics")
|
|
self.args.append(f"--alluredir={self.allure_dir}")
|
|
if not options.save_log_on_success:
|
|
self.args.append("--allure-no-capture")
|
|
else:
|
|
self.args.append('--save-log-on-success')
|
|
if options.markers:
|
|
self.args.append(f"-m={options.markers}")
|
|
|
|
# https://docs.pytest.org/en/7.1.x/reference/exit-codes.html
|
|
no_tests_selected_exit_code = 5
|
|
self.valid_exit_codes = [0, no_tests_selected_exit_code]
|
|
|
|
if pytest_arg := getattr(options, "pytest_arg", ""):
|
|
self.args += shlex.split(pytest_arg)
|
|
|
|
arg = str(self.suite.suite_path / f"{self.shortname}{self.suite.test_file_ext}")
|
|
if self.casename is not None:
|
|
arg += '::' + self.casename
|
|
self.args.append(arg)
|
|
|
|
def reset(self) -> None:
|
|
"""Reset the test before a retry, if it is retried as flaky"""
|
|
super().reset()
|
|
self.server_log = None
|
|
self.server_log_filename = None
|
|
self.is_before_test_ok = False
|
|
self.is_after_test_ok = False
|
|
|
|
def print_summary(self) -> None:
|
|
print("Output of {} {}:".format(self.path, " ".join(self.args)))
|
|
print(read_log(self.log_filename))
|
|
if self.server_log is not None:
|
|
print("Server log of the first server:")
|
|
print(self.server_log)
|
|
|
|
@asynccontextmanager
|
|
async def run_ctx(self, options: argparse.Namespace) -> AsyncGenerator[None]:
|
|
"""A test's setup/teardown context manager.
|
|
|
|
Important part of this code is getting a ScyllaDB node from the pool and providing an address to the host
|
|
as a `--host` argument. This node returned to the pool after test is finished. If the test was failed then
|
|
the node will be marked as dirty.
|
|
"""
|
|
self._prepare_pytest_params(options)
|
|
|
|
loggerPrefix = self.mode + '/' + self.uname
|
|
logger = LogPrefixAdapter(logging.getLogger(loggerPrefix), {'prefix': loggerPrefix})
|
|
cluster = await self.suite.clusters.get(logger)
|
|
try:
|
|
cluster.before_test(self.uname)
|
|
prepare_cql = self.suite.cfg.get("prepare_cql", None)
|
|
if prepare_cql and not hasattr(cluster, 'prepare_cql_executed'):
|
|
cc = next(iter(cluster.running.values())).control_connection
|
|
if not isinstance(prepare_cql, collections.abc.Iterable):
|
|
prepare_cql = [prepare_cql]
|
|
for stmt in prepare_cql:
|
|
cc.execute(stmt)
|
|
cluster.prepare_cql_executed = True
|
|
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
|
self.server_address = cluster.endpoint()
|
|
self.args.insert(0, f"--host={self.server_address}")
|
|
self.server_log_filename = cluster.server_log_filename()
|
|
self.args.insert(0, f"--scylla-log-filename={self.server_log_filename}")
|
|
self.is_before_test_ok = True
|
|
cluster.take_log_savepoint()
|
|
|
|
yield
|
|
|
|
if self.shortname in self.suite.dirties_cluster:
|
|
cluster.is_dirty = True
|
|
cluster.after_test(self.uname, self.success)
|
|
self.is_after_test_ok = True
|
|
except Exception as e:
|
|
if not self.is_before_test_ok:
|
|
print(f"Test {self.name} pre-check failed: {str(e)}\ncheck server logs: {self.server_log_filename}")
|
|
logger.info(f"Discarding cluster after failed start for test %s...", self.name)
|
|
elif not self.is_after_test_ok:
|
|
print(f"Test {self.name} post-check failed: {str(e)}\ncheck server logs: {self.server_log_filename}")
|
|
logger.info(f"Discarding cluster after failed test %s...", self.name)
|
|
self.success = False
|
|
cluster.is_dirty = True
|
|
await self.suite.clusters.put(cluster, is_dirty=cluster.is_dirty)
|
|
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
|
|
|
|
async def run(self, options: argparse.Namespace) -> Test:
|
|
async with self.run_ctx(options=options):
|
|
self.success = await run_test(test=self, options=options, env=self.suite.scylla_env)
|
|
return self
|
|
|
|
|
|
# Use cache to execute this function once per pytest session.
|
|
@cache
|
|
def add_host_option(parser: Parser) -> None:
|
|
parser.addoption("--host", default="localhost",
|
|
help="a DB server host to connect to")
|
|
|
|
|
|
# Use cache to execute this function once per pytest session.
|
|
@cache
|
|
def add_cql_connection_options(parser: Parser) -> None:
|
|
"""Add pytest options for a CQL connection."""
|
|
|
|
cql_options = parser.getgroup("CQL connection options")
|
|
cql_options.addoption("--port", default="9042",
|
|
help="CQL port to connect to")
|
|
cql_options.addoption("--ssl", action="store_true",
|
|
help="Connect to CQL via an encrypted TLSv1.2 connection")
|
|
cql_options.addoption("--auth_username",
|
|
help="username for authentication")
|
|
cql_options.addoption("--auth_password",
|
|
help="password for authentication")
|
|
|
|
|
|
# Use cache to execute this function once per pytest session.
|
|
@cache
|
|
def add_s3_options(parser: Parser) -> None:
|
|
"""Options for tests which use S3 server (i.e., cluster/object_store and cqlpy/test_tools.py)"""
|
|
|
|
s3_options = parser.getgroup("S3 server settings")
|
|
s3_options.addoption('--s3-server-address')
|
|
s3_options.addoption('--s3-server-port', type=int)
|
|
s3_options.addoption('--aws-access-key')
|
|
s3_options.addoption('--aws-secret-key')
|
|
s3_options.addoption('--aws-region')
|
|
s3_options.addoption('--s3-server-bucket')
|