Files
scylladb/test/pylib/suite/python.py
Artsiom Mishuta f23d19e248 test.py: fix dumping big logs to output
1. Remove dumping cluster logs and print only the link to the log.
2. Fail the test (to fail CI and not ignore the problem) and mark the cluster as dirty (to avoid affecting subsequent tests) in case setup/teardown fails.
3. Add 2 cqlpy tests that fail after applying step 2 to the dirties_cluster list so the cluster is discarded afterward.

Closes scylladb/scylladb#26183
2025-09-25 22:36:46 +03:00

304 lines
13 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import collections
import logging
import os
import pathlib
import shlex
from contextlib import asynccontextmanager
from functools import cache
from typing import TYPE_CHECKING
from scripts import coverage
from test import path_to
from test.pylib.pool import Pool
from test.pylib.scylla_cluster import ScyllaCluster, ScyllaServer, merge_cmdline_options, get_current_version_description
from test.pylib.suite.base import Test, TestSuite, read_log, run_test
from test.pylib.util import LogPrefixAdapter
if TYPE_CHECKING:
import argparse
from collections.abc import Callable, Awaitable, AsyncGenerator
from typing import Optional, Union
from pytest import Parser
class PythonTestSuite(TestSuite):
"""A collection of Python pytests against a single Scylla instance"""
test_file_ext = ".py"
def __init__(self, path, cfg: dict, options: argparse.Namespace, mode: str) -> None:
super().__init__(path, cfg, options, mode)
self.scylla_exe = path_to(self.mode, "scylla")
self.scylla_env = dict(self.base_env)
if self.mode == "coverage":
self.scylla_env.update(coverage.env(self.scylla_exe, distinct_id=self.name))
self.scylla_env['SCYLLA'] = self.scylla_exe
cluster_cfg = self.cfg.get("cluster", {"initial_size": 1})
cluster_size = cluster_cfg["initial_size"]
env_pool_size = os.getenv("CLUSTER_POOL_SIZE")
if options.cluster_pool_size is not None:
pool_size = options.cluster_pool_size
elif env_pool_size is not None:
pool_size = int(env_pool_size)
else:
pool_size = cfg.get("pool_size", 2)
self.dirties_cluster = set(cfg.get("dirties_cluster", []))
self.create_cluster = self.get_cluster_factory(cluster_size, options)
async def recycle_cluster(cluster: ScyllaCluster) -> None:
"""When a dirty cluster is returned to the cluster pool,
stop it and release the used IPs. We don't necessarily uninstall() it yet,
which would delete the log file and directory - we might want to preserve
these if it came from a failed test.
"""
for srv in cluster.servers.values():
if srv.log_file is not None:
srv.log_file.close()
srv.maintenance_socket_dir.cleanup()
await cluster.stop()
await cluster.release_ips()
self.clusters = Pool(pool_size, self.create_cluster, recycle_cluster)
def get_cluster_factory(self, cluster_size: int, options: argparse.Namespace) -> Callable[..., Awaitable]:
def create_server(create_cfg: ScyllaCluster.CreateServerParams):
cmdline_options = self.cfg.get("extra_scylla_cmdline_options", [])
if type(cmdline_options) == str:
cmdline_options = [cmdline_options]
cmdline_options = merge_cmdline_options(cmdline_options, create_cfg.cmdline_from_test)
cmdline_options = merge_cmdline_options(cmdline_options, options.extra_scylla_cmdline_options)
# There are multiple sources of config options, with increasing priority
# (if two sources provide the same config option, the higher priority one wins):
# 1. the defaults
# 2. suite-specific config options (in "extra_scylla_config_options")
# 3. config options from tests (when servers are added during a test)
default_config_options = \
{"authenticator": "PasswordAuthenticator",
"authorizer": "CassandraAuthorizer"}
default_config_options["tablets_initial_scale_factor"] = 4 if self.mode == "release" else 2
config_options = default_config_options | \
self.cfg.get("extra_scylla_config_options", {}) | \
create_cfg.config_from_test
server = ScyllaServer(
mode=self.mode,
version=(create_cfg.version or get_current_version_description(self.scylla_exe)),
vardir=self.log_dir,
logger=create_cfg.logger,
cluster_name=create_cfg.cluster_name,
ip_addr=create_cfg.ip_addr,
seeds=create_cfg.seeds,
cmdline_options=cmdline_options,
config_options=config_options,
property_file=create_cfg.property_file,
append_env=self.base_env,
server_encryption=create_cfg.server_encryption)
return server
async def create_cluster(logger: Union[logging.Logger, logging.LoggerAdapter]) -> ScyllaCluster:
cluster = ScyllaCluster(logger, self.hosts, cluster_size, create_server)
async def stop() -> None:
await cluster.stop()
# Suite artifacts are removed when
# the entire suite ends successfully.
self.artifacts.add_suite_artifact(self, stop)
if not self.options.save_log_on_success:
# If a test fails, we might want to keep the data dirs.
async def uninstall() -> None:
await cluster.uninstall()
self.artifacts.add_suite_artifact(self, uninstall)
self.artifacts.add_exit_artifact(self, stop)
await cluster.install_and_start()
return cluster
return create_cluster
@property
def pattern(self) -> str | list[str]:
return ["*_test.py", "*_tests.py", "test_*.py"]
async def add_test(self, shortname, casename) -> None:
test = PythonTest(self.next_id((shortname, self.suite_key)), shortname, casename, self)
self.tests.append(test)
async def run(self, test: 'Test', options: argparse.Namespace):
if not os.access(self.scylla_exe, os.F_OK):
raise FileNotFoundError(f"{self.scylla_exe} does not exist.")
if not os.access(self.scylla_exe, os.X_OK):
raise PermissionError(f"{self.scylla_exe} is not executable.")
return await super().run(test, options)
class PythonTest(Test):
"""Run a pytest collection of cases against a standalone Scylla"""
def __init__(self, test_no: int, shortname: str, casename: str, suite) -> None:
super().__init__(test_no, shortname, suite)
self.path = "python"
self.core_args = ["-m", "pytest"]
self.casename = casename
self.xmlout = self.suite.log_dir / "xml" / f"{self.uname}.xunit.xml"
self.server_address: str | None = None
self.server_log: Optional[str] = None
self.server_log_filename: Optional[pathlib.Path] = None
self.is_before_test_ok = False
self.is_after_test_ok = False
def _prepare_pytest_params(self, options: argparse.Namespace):
self.args = [
"-s", # don't capture print() output inside pytest
"--log-level=DEBUG", # Capture logs
"-vv",
"-o",
"junit_family=xunit2",
"-o",
"junit_suite_name={}".format(self.suite.name),
"--junit-xml={}".format(self.xmlout),
"-rs",
"--run_id={}".format(self.id),
"--mode={}".format(self.mode),
"--tmpdir={}".format(options.tmpdir),
]
if options.gather_metrics:
self.args.append("--gather-metrics")
self.args.append(f"--alluredir={self.allure_dir}")
if not options.save_log_on_success:
self.args.append("--allure-no-capture")
else:
self.args.append('--save-log-on-success')
if options.markers:
self.args.append(f"-m={options.markers}")
# https://docs.pytest.org/en/7.1.x/reference/exit-codes.html
no_tests_selected_exit_code = 5
self.valid_exit_codes = [0, no_tests_selected_exit_code]
if pytest_arg := getattr(options, "pytest_arg", ""):
self.args += shlex.split(pytest_arg)
arg = str(self.suite.suite_path / f"{self.shortname}{self.suite.test_file_ext}")
if self.casename is not None:
arg += '::' + self.casename
self.args.append(arg)
def reset(self) -> None:
"""Reset the test before a retry, if it is retried as flaky"""
super().reset()
self.server_log = None
self.server_log_filename = None
self.is_before_test_ok = False
self.is_after_test_ok = False
def print_summary(self) -> None:
print("Output of {} {}:".format(self.path, " ".join(self.args)))
print(read_log(self.log_filename))
if self.server_log is not None:
print("Server log of the first server:")
print(self.server_log)
@asynccontextmanager
async def run_ctx(self, options: argparse.Namespace) -> AsyncGenerator[None]:
"""A test's setup/teardown context manager.
Important part of this code is getting a ScyllaDB node from the pool and providing an address to the host
as a `--host` argument. This node returned to the pool after test is finished. If the test was failed then
the node will be marked as dirty.
"""
self._prepare_pytest_params(options)
loggerPrefix = self.mode + '/' + self.uname
logger = LogPrefixAdapter(logging.getLogger(loggerPrefix), {'prefix': loggerPrefix})
cluster = await self.suite.clusters.get(logger)
try:
cluster.before_test(self.uname)
prepare_cql = self.suite.cfg.get("prepare_cql", None)
if prepare_cql and not hasattr(cluster, 'prepare_cql_executed'):
cc = next(iter(cluster.running.values())).control_connection
if not isinstance(prepare_cql, collections.abc.Iterable):
prepare_cql = [prepare_cql]
for stmt in prepare_cql:
cc.execute(stmt)
cluster.prepare_cql_executed = True
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
self.server_address = cluster.endpoint()
self.args.insert(0, f"--host={self.server_address}")
self.server_log_filename = cluster.server_log_filename()
self.args.insert(0, f"--scylla-log-filename={self.server_log_filename}")
self.is_before_test_ok = True
cluster.take_log_savepoint()
yield
if self.shortname in self.suite.dirties_cluster:
cluster.is_dirty = True
cluster.after_test(self.uname, self.success)
self.is_after_test_ok = True
except Exception as e:
if not self.is_before_test_ok:
print(f"Test {self.name} pre-check failed: {str(e)}\ncheck server logs: {self.server_log_filename}")
logger.info(f"Discarding cluster after failed start for test %s...", self.name)
elif not self.is_after_test_ok:
print(f"Test {self.name} post-check failed: {str(e)}\ncheck server logs: {self.server_log_filename}")
logger.info(f"Discarding cluster after failed test %s...", self.name)
self.success = False
cluster.is_dirty = True
await self.suite.clusters.put(cluster, is_dirty=cluster.is_dirty)
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
async def run(self, options: argparse.Namespace) -> Test:
async with self.run_ctx(options=options):
self.success = await run_test(test=self, options=options, env=self.suite.scylla_env)
return self
# Use cache to execute this function once per pytest session.
@cache
def add_host_option(parser: Parser) -> None:
parser.addoption("--host", default="localhost",
help="a DB server host to connect to")
# Use cache to execute this function once per pytest session.
@cache
def add_cql_connection_options(parser: Parser) -> None:
"""Add pytest options for a CQL connection."""
cql_options = parser.getgroup("CQL connection options")
cql_options.addoption("--port", default="9042",
help="CQL port to connect to")
cql_options.addoption("--ssl", action="store_true",
help="Connect to CQL via an encrypted TLSv1.2 connection")
cql_options.addoption("--auth_username",
help="username for authentication")
cql_options.addoption("--auth_password",
help="password for authentication")
# Use cache to execute this function once per pytest session.
@cache
def add_s3_options(parser: Parser) -> None:
"""Options for tests which use S3 server (i.e., cluster/object_store and cqlpy/test_tools.py)"""
s3_options = parser.getgroup("S3 server settings")
s3_options.addoption('--s3-server-address')
s3_options.addoption('--s3-server-port', type=int)
s3_options.addoption('--aws-access-key')
s3_options.addoption('--aws-secret-key')
s3_options.addoption('--aws-region')
s3_options.addoption('--s3-server-bucket')