Files
scylladb/test/cluster/test_replica_exceptions.py
Andrei Chekun cc5ac75d73 test.py: remove deprecated skip_mode decorator
Finishing the deprecation of the skip_mode function in favor of
pytest.mark.skip_mode. This PR is only cleaning and migrating leftover tests
that are still used and old way of skip_mode.

Closes scylladb/scylladb#28299
2026-01-25 18:17:27 +02:00

197 lines
6.8 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import asyncio
from typing import Any, Callable, NamedTuple
import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.repair import ServerInfo
from test.pylib.rest_client import ScyllaMetrics, inject_error
from .util import new_test_keyspace, new_test_table
CPP_EXCEPTIONS_METRIC_NAME = "scylla_reactor_cpp_exceptions"
class Measurement(NamedTuple):
run_count: int = 1000
cpp_exception_threshold: int = 20
metric_name: str = ""
metric_error_threshold: int = 1000
class DB(NamedTuple):
ks_opts: str = "WITH REPLICATION = { 'replication_factor' : '1' }"
tbl_schema: str = "p int, c int, PRIMARY KEY (p)"
tbl_opts: str = ""
prep_stmt_gen: Callable[[str], str] | None = None
stmt_gen: Callable[[str, int], str] | None = None
class Injection(NamedTuple):
name: str = ""
async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]:
return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers])
def get_metric_count(metrics: list[ScyllaMetrics], metric_name: str) -> int:
return sum([m.get(metric_name) for m in metrics])
async def _test_impl(
manager: ManagerClient,
config: dict[str, Any],
measurement: Measurement,
db: DB,
injection: Injection):
servers = await manager.servers_add(1, config=config if config is not None else {})
cql, hosts = await manager.get_ready_cql(servers)
host_ip = servers[0].ip_addr
host = hosts[0]
metrics_before = await get_metrics(manager, servers)
async with new_test_keyspace(manager, db.ks_opts, host) as ks:
async with new_test_table(manager, ks, db.tbl_schema, db.tbl_opts, host) as tbl:
if db.prep_stmt_gen is not None:
prep_stmt = db.prep_stmt_gen(tbl)
if prep_stmt is not None and prep_stmt != "":
cql.execute(prep_stmt)
async with inject_error(manager.api, host_ip, injection.name):
futures = []
for i in range(measurement.run_count):
try:
query = db.stmt_gen(tbl, i)
f = cql.execute_async(query)
futures.append(f)
except Exception:
pass
for f in futures:
try:
f.result()
except Exception:
pass
metrics_after = await get_metrics(manager, servers)
metrics_errors = get_metric_count(metrics_after, measurement.metric_name) - get_metric_count(metrics_before, measurement.metric_name)
assert metrics_errors >= measurement.metric_error_threshold
cpp_exceptions = get_metric_count(metrics_after, CPP_EXCEPTIONS_METRIC_NAME) - get_metric_count(metrics_before, CPP_EXCEPTIONS_METRIC_NAME)
assert cpp_exceptions <= measurement.cpp_exception_threshold
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_do_apply_rate_limit_no_cpp_exceptions(manager: ManagerClient):
measurement = (m := Measurement())._replace(
metric_name = "scylla_database_total_writes_rate_limited",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_opts = "WITH per_partition_rate_limit = {'max_writes_per_second': 1}",
stmt_gen = lambda tbl, i: f"INSERT INTO {tbl} (p, c) VALUES (1, {2*i})"
)
injection = Injection(
name = "rate_limit_force_defer"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_query_rate_limit_no_cpp_exceptions(manager: ManagerClient):
measurement = (m := Measurement())._replace(
metric_name = "scylla_database_total_reads_rate_limited",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_opts = "WITH per_partition_rate_limit = {'max_reads_per_second': 1}",
stmt_gen = lambda tbl, i: f"SELECT * FROM {tbl} WHERE p = 1"
)
injection = Injection(
name = "rate_limit_force_defer"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_writes_apply_counter_update_timeout(manager: ManagerClient):
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count
)
db = DB(
tbl_schema = "p int, c counter, PRIMARY KEY (p)",
stmt_gen = lambda tbl, _: f"UPDATE {tbl} SET c = c + 1 WHERE p = 1"
)
injection = Injection(
name = "database_apply_counter_update_force_timeout"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_writes_do_apply_counter_update_timeout(manager: ManagerClient):
config = { "counter_write_request_timeout_in_ms": 50 }
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_schema = "p int, c counter, PRIMARY KEY (p)",
stmt_gen = lambda tbl, _: f"UPDATE {tbl} SET c = c + 1 WHERE p = 1"
)
injection = Injection(
name = "apply_counter_update_delay_100ms",
)
await _test_impl(
manager=manager,
config=config,
measurement=measurement,
db=db,
injection=injection
)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_replica_database_apply_timeout(manager: ManagerClient):
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
stmt_gen = lambda tbl, i: f"INSERT INTO {tbl} (p, c) VALUES ({i}, {2*i})"
)
injection = Injection(
name = "database_apply_force_timeout"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)