Files
scylladb/test/cluster/test_replica_exceptions.py
Michael Litvak 1dbf53ca29 test: enable counters tests with tablets
Enable all counters-related tests that were disabled for tablets because
counters was not supported with tablets until now.

Some tests were parametrized to run with both vnodes and tablets, and
the tablets case was skipped, in order to not lose coverage. We change
them to run with the default configuration since now counters is
supported with both vnodes and tablets, and the implementation is the
same, so there is no benefit in running them with both configurations.
2025-11-03 16:04:37 +01:00

196 lines
6.7 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import asyncio
from typing import Any, Callable, NamedTuple
from test.cluster.conftest import skip_mode
from test.pylib.manager_client import ManagerClient
from test.pylib.repair import ServerInfo
from test.pylib.rest_client import ScyllaMetrics, inject_error
from .util import new_test_keyspace, new_test_table
CPP_EXCEPTIONS_METRIC_NAME = "scylla_reactor_cpp_exceptions"
class Measurement(NamedTuple):
run_count: int = 1000
cpp_exception_threshold: int = 20
metric_name: str = ""
metric_error_threshold: int = 1000
class DB(NamedTuple):
ks_opts: str = "WITH REPLICATION = { 'replication_factor' : '1' }"
tbl_schema: str = "p int, c int, PRIMARY KEY (p)"
tbl_opts: str = ""
prep_stmt_gen: Callable[[str], str] | None = None
stmt_gen: Callable[[str, int], str] | None = None
class Injection(NamedTuple):
name: str = ""
async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]:
return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers])
def get_metric_count(metrics: list[ScyllaMetrics], metric_name: str) -> int:
return sum([m.get(metric_name) for m in metrics])
async def _test_impl(
manager: ManagerClient,
config: dict[str, Any],
measurement: Measurement,
db: DB,
injection: Injection):
servers = await manager.servers_add(1, config=config if config is not None else {})
cql, hosts = await manager.get_ready_cql(servers)
host_ip = servers[0].ip_addr
host = hosts[0]
metrics_before = await get_metrics(manager, servers)
async with new_test_keyspace(manager, db.ks_opts, host) as ks:
async with new_test_table(manager, ks, db.tbl_schema, db.tbl_opts, host) as tbl:
if db.prep_stmt_gen is not None:
prep_stmt = db.prep_stmt_gen(tbl)
if prep_stmt is not None and prep_stmt != "":
cql.execute(prep_stmt)
async with inject_error(manager.api, host_ip, injection.name):
futures = []
for i in range(measurement.run_count):
try:
query = db.stmt_gen(tbl, i)
f = cql.execute_async(query)
futures.append(f)
except Exception:
pass
for f in futures:
try:
f.result()
except Exception:
pass
metrics_after = await get_metrics(manager, servers)
metrics_errors = get_metric_count(metrics_after, measurement.metric_name) - get_metric_count(metrics_before, measurement.metric_name)
assert metrics_errors >= measurement.metric_error_threshold
cpp_exceptions = get_metric_count(metrics_after, CPP_EXCEPTIONS_METRIC_NAME) - get_metric_count(metrics_before, CPP_EXCEPTIONS_METRIC_NAME)
assert cpp_exceptions <= measurement.cpp_exception_threshold
@skip_mode("release", "error injections are not supported in release mode")
async def test_replica_do_apply_rate_limit_no_cpp_exceptions(manager: ManagerClient):
measurement = (m := Measurement())._replace(
metric_name = "scylla_database_total_writes_rate_limited",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_opts = "WITH per_partition_rate_limit = {'max_writes_per_second': 1}",
stmt_gen = lambda tbl, i: f"INSERT INTO {tbl} (p, c) VALUES (1, {2*i})"
)
injection = Injection(
name = "rate_limit_force_defer"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@skip_mode("release", "error injections are not supported in release mode")
async def test_replica_query_rate_limit_no_cpp_exceptions(manager: ManagerClient):
measurement = (m := Measurement())._replace(
metric_name = "scylla_database_total_reads_rate_limited",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_opts = "WITH per_partition_rate_limit = {'max_reads_per_second': 1}",
stmt_gen = lambda tbl, i: f"SELECT * FROM {tbl} WHERE p = 1"
)
injection = Injection(
name = "rate_limit_force_defer"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@skip_mode("release", "error injections are not supported in release mode")
async def test_replica_writes_apply_counter_update_timeout(manager: ManagerClient):
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count
)
db = DB(
tbl_schema = "p int, c counter, PRIMARY KEY (p)",
stmt_gen = lambda tbl, _: f"UPDATE {tbl} SET c = c + 1 WHERE p = 1"
)
injection = Injection(
name = "database_apply_counter_update_force_timeout"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)
@skip_mode("release", "error injections are not supported in release mode")
async def test_replica_writes_do_apply_counter_update_timeout(manager: ManagerClient):
config = { "counter_write_request_timeout_in_ms": 50 }
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
tbl_schema = "p int, c counter, PRIMARY KEY (p)",
stmt_gen = lambda tbl, _: f"UPDATE {tbl} SET c = c + 1 WHERE p = 1"
)
injection = Injection(
name = "apply_counter_update_delay_100ms",
)
await _test_impl(
manager=manager,
config=config,
measurement=measurement,
db=db,
injection=injection
)
@skip_mode("release", "error injections are not supported in release mode")
async def test_replica_database_apply_timeout(manager: ManagerClient):
measurement = (m := Measurement())._replace(
cpp_exception_threshold = 20 + m.run_count * 10,
metric_name = "scylla_database_total_writes_timedout",
metric_error_threshold = m.run_count * 0.9
)
db = DB(
stmt_gen = lambda tbl, i: f"INSERT INTO {tbl} (p, c) VALUES ({i}, {2*i})"
)
injection = Injection(
name = "database_apply_force_timeout"
)
await _test_impl(
manager=manager,
config={},
measurement=measurement,
db=db,
injection=injection
)