Files
scylladb/test/cluster/test_replica_exceptions.py
Andrei Chekun c950c2e582 test.py: convert skip_mode function to pytest.mark
Function skip_mode works only on function and only in cluster test. This if OK
when we need to skip one test, but it's not possible to use it with pytestmark
to automatically mark all tests in the file. The goal of this PR is to migrate

skip_mode to be dynamic pytest.mark that can be used as ordinary mark.

Closes scylladb/scylladb#27853

[avi: apply to test/cluster/test_tablets.py::test_table_creation_wakes_up_balancer]
2026-01-08 21:55:16 +02:00

198 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import asyncio
from typing import Any, Callable, NamedTuple
import pytest
from test.cluster.conftest import skip_mode
from test.pylib.manager_client import ManagerClient
from test.pylib.repair import ServerInfo
from test.pylib.rest_client import ScyllaMetrics, inject_error
from .util import new_test_keyspace, new_test_table
CPP_EXCEPTIONS_METRIC_NAME = "scylla_reactor_cpp_exceptions"
class Measurement(NamedTuple):
run_count: int = 1000
cpp_exception_threshold: int = 20
metric_name: str = ""
metric_error_threshold: int = 1000
class DB(NamedTuple):
ks_opts: str = "WITH REPLICATION = { 'replication_factor' : '1' }"
tbl_schema: str = "p int, c int, PRIMARY KEY (p)"
tbl_opts: str = ""
prep_stmt_gen: Callable[[str], str] | None = None
stmt_gen: Callable[[str, int], str] | None = None
class Injection(NamedTuple):
name: str = ""
async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]:
return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers])
def get_metric_count(metrics: list[ScyllaMetrics], metric_name: str) -> int:
return sum([m.get(metric_name) for m in metrics])
async def _test_impl(
manager: ManagerClient,
config: dict[str, Any],
measurement: Measurement,
db: DB,
injection: Injection):
servers = await manager.servers_add(1, config=config if config is not None else {})
cql, hosts = await manager.get_ready_cql(servers)
host_ip = servers[0].ip_addr
host = hosts[0]
metrics_before = await get_metrics(manager, servers)
async with new_test_keyspace(manager, db.ks_opts, host) as ks:
async with new_test_table(manager, ks, db.tbl_schema, db.tbl_opts, host) as tbl:
if db.prep_stmt_gen is not None:
prep_stmt = db.prep_stmt_gen(tbl)
if prep_stmt is not None and prep_stmt != "":
cql.execute(prep_stmt)
async with inject_error(manager.api, host_ip, injection.name):
futures = []
for i in range(measurement.run_count):
try:
query = db.stmt_gen(tbl, i)
f = cql.execute_async(query)
futures.append(f)
except Exception:
pass
for f in futures:
try:
f.result()
except Exception:
pass
metrics_after = await get_metrics(manager, servers)
metrics_errors = get_metric_count(metrics_after, measurement.metric_name) - get_metric_count(metrics_before, measurement.metric_name)
assert metrics_errors >= measurement.metric_error_threshold
cpp_exceptions = get_metric_count(metrics_after, CPP_EXCEPTIONS_METRIC_NAME) - get_metric_count(metrics_before, CPP_EXCEPTIONS_METRIC_NAME)
assert cpp_exceptions <= measurement.cpp_exception_threshold
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_do_apply_rate_limit_no_cpp_exceptions(manager: ManagerClient):
measurement = (m := Measurement())._replace(
metric_name = "scylla_database_total_writes_rate_limited",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_opts = "WITH per_partition_rate_limit = {'max_writes_per_second': 1}",
stmt_gen = lambda tbl, i: f"INSERT INTO {tbl} (p, c) VALUES (1, {2*i})"
)
injection = Injection(
name = "rate_limit_force_defer"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_query_rate_limit_no_cpp_exceptions(manager: ManagerClient):
measurement = (m := Measurement())._replace(
metric_name = "scylla_database_total_reads_rate_limited",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_opts = "WITH per_partition_rate_limit = {'max_reads_per_second': 1}",
stmt_gen = lambda tbl, i: f"SELECT * FROM {tbl} WHERE p = 1"
)
injection = Injection(
name = "rate_limit_force_defer"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_writes_apply_counter_update_timeout(manager: ManagerClient):
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count
)
db = DB(
tbl_schema = "p int, c counter, PRIMARY KEY (p)",
stmt_gen = lambda tbl, _: f"UPDATE {tbl} SET c = c + 1 WHERE p = 1"
)
injection = Injection(
name = "database_apply_counter_update_force_timeout"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_writes_do_apply_counter_update_timeout(manager: ManagerClient):
config = { "counter_write_request_timeout_in_ms": 50 }
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_schema = "p int, c counter, PRIMARY KEY (p)",
stmt_gen = lambda tbl, _: f"UPDATE {tbl} SET c = c + 1 WHERE p = 1"
)
injection = Injection(
name = "apply_counter_update_delay_100ms",
)
await _test_impl(
manager=manager,
config=config,
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_database_apply_timeout(manager: ManagerClient):
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
stmt_gen = lambda tbl, i: f"INSERT INTO {tbl} (p, c) VALUES ({i}, {2*i})"
)
injection = Injection(
name = "database_apply_force_timeout"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)