Files
scylladb/test/cluster/test_long_query_timeout_erm.py
Andrei Chekun c950c2e582 test.py: convert skip_mode function to pytest.mark
Function skip_mode works only on function and only in cluster test. This if OK
when we need to skip one test, but it's not possible to use it with pytestmark
to automatically mark all tests in the file. The goal of this PR is to migrate

skip_mode to be dynamic pytest.mark that can be used as ordinary mark.

Closes scylladb/scylladb#27853

[avi: apply to test/cluster/test_tablets.py::test_table_creation_wakes_up_balancer]
2026-01-08 21:55:16 +02:00

192 lines
8.6 KiB
Python

# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
from cassandra.policies import WhiteListRoundRobinPolicy
from test.cluster.conftest import skip_mode
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, Column, IntType
from test.pylib.rest_client import inject_error_one_shot
from test.pylib.util import wait_for_first_completed
import asyncio
from datetime import datetime, timedelta
import pytest
import logging
import time
logger = logging.getLogger(__name__)
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.asyncio
@pytest.mark.parametrize("query_type,should_wait_for_timeout,shutdown_nodes", [
("SELECT", True, True),
("SELECT_COUNT", False, False),
("SELECT_COUNT_WHERE", True, False),
])
async def test_long_query_timeout_erm(request, manager: ManagerClient, query_type, should_wait_for_timeout, shutdown_nodes):
"""
Test verifies that a query with long timeout doesn't block ERM on failure.
The test is parametrized ("SELECT * ..." and "SELECT COUNT(*) ...") to verify both
regular query execution and aggregate query that goes through mapreduce service.
After an initial preparation, the test kills a node during the execution of
a query with long timeout. The expectation is that:
1) Due to scylladb#21831 fix, ERM will not be blocked for the timeout period.
2) For non-mapreduce queries, due to scylladb#3699 fix, the query will not finish
before the timeout passes.
3) For mapreduce query, the query will fail promptly with "Operation failed for ..." error,
because waiting with a hope for driver-side speculative retry (scylladb#3699 fix)
is not needed for an aggregate query.
Please note that "SELECT COUNT(*) FROM {} WHERE key = 0" is not a mapreduce query.
One of the test scenarios sets shutdown_nodes=True, to verify that we are able to
quickly shutdown nodes, even if query timeout is extremely long.
"""
if query_type == "SELECT":
query = "SELECT * FROM {}"
elif query_type == "SELECT_COUNT":
query = "SELECT COUNT(*) FROM {}"
elif query_type == "SELECT_COUNT_WHERE":
query = "SELECT COUNT(*) FROM {} WHERE key = 0"
else:
assert False # Invalid query type
logger.info("Start four nodes cluster")
# Override config:
# - rf_rack_valid_keyspaces=False to keep number of nodes small as in-rack redundancy is required
# - request_timeout_on_shutdown_in_seconds=10 to shorten waiting afters cylladb#24499 was implemented
servers = await manager.servers_add(4, config={"rf_rack_valid_keyspaces": False, "request_timeout_on_shutdown_in_seconds": 10})
selected_server = servers[0]
logger.info(f"Creating a client with selected_server: {selected_server}")
cql = await manager.get_cql_exclusive(selected_server)
logger.info("Create a table")
random_tables = RandomTables(request.node.name, manager, "ks", replication_factor=3, enable_tablets=True)
table = await random_tables.add_table(pks=1, columns=[
Column(name="key", ctype=IntType),
Column(name="value", ctype=IntType)
])
if "WHERE" in query:
# For non-ranged queries, Scylladb-side speculative retries are disabled to enforce
# conditions of scylladb#3699 fix.
logger.info("Disabling speculative retries")
cql.execute(f"ALTER TABLE {table} WITH speculative_retry = 'NONE'")
logger.info(f"Created {table}, write some rows")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {table} (key, value) VALUES ({i}, {i})") for i in range(256)])
logger.info("Enable injected errors that stop reads")
injected_handlers = {}
for server in servers:
injected_handlers[server.ip_addr] = await inject_error_one_shot(
manager.api, server.ip_addr, 'storage_proxy::handle_read', parameters={'cf_name': table.name})
query_timeout = 30
if shutdown_nodes:
query_timeout *= 100 # Will end test by shutting down nodes instead of finishing the query
before_query = datetime.now()
final_query = query.format(table)
logger.info("Starting to execute query={final_query} with query_timeout={query_timeout}")
query_future = cql.run_async(f"{final_query} USING TIMEOUT {query_timeout}s", timeout=2*query_timeout)
logger.info("Confirm reads are waiting on the injected error")
server_to_kill = None
async def wait_for_log_on_any_node(server):
server_log = await manager.server_open_log(server.server_id)
await server_log.wait_for("storage_proxy::handle_read injection hit")
return server
async with asyncio.TaskGroup() as tg:
log_watch_tasks = [tg.create_task(wait_for_log_on_any_node(server)) for server in servers if server != selected_server]
done, pending = await asyncio.wait(log_watch_tasks, return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
server_to_kill = done.pop().result()
logger.info(f"Kill a node: {server_to_kill.ip_addr}")
await manager.server_stop(server_to_kill.server_id)
logger.info("Unblock reads")
for server in servers:
if server != server_to_kill:
await injected_handlers[server.ip_addr].message()
logger.info("Remove the killed node, add a new one - ERM should not be blocked")
await manager.server_not_sees_other_server(selected_server.ip_addr, server_to_kill.ip_addr)
await manager.remove_node(selected_server.server_id, server_to_kill.server_id)
expected_timeout_time = before_query + timedelta(seconds=query_timeout)
if datetime.now() > expected_timeout_time:
logger.warning("Too much time passed and query already might be timed-out")
if shutdown_nodes:
for server in servers:
await manager.server_stop_gracefully(server.server_id)
else:
if should_wait_for_timeout:
with pytest.raises(Exception, match="Operation time"):
await query_future
assert datetime.now() > expected_timeout_time
else:
with pytest.raises(Exception, match="Operation failed for"):
await query_future
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.asyncio
@pytest.mark.parametrize("enable_tablets", [True, False])
async def test_long_query_timeout_without_failure_erm(request, manager: ManagerClient, enable_tablets):
"""
Test verifies that a long mapreduce query does not block ERM.
The query is long because it is blocked by an injected error
that pauses reads during dispatching. When enable_tablet is False,
old mapreduce algorithm blocks ERM, so topology changes are also blocked.
When enable_tablets is True, the ERM is not blocked.
"""
servers = await manager.servers_add(3)
cql, hosts = await manager.get_ready_cql(servers)
logger.info("Create a table with enable_tablets=%s", enable_tablets)
random_tables = RandomTables(request.node.name, manager, "ks", replication_factor=1, enable_tablets=enable_tablets)
table = await random_tables.add_table(pks=1, columns=[
Column(name="key", ctype=IntType),
Column(name="value", ctype=IntType)
])
logger.info(f"Created {table}, write some rows")
num_rows = 256
await asyncio.gather(*[cql.run_async(f"INSERT INTO {table} (key, value) VALUES ({i}, {i})") for i in range(num_rows)])
logger.info("Enable injected errors that stop read before storage_proxy takes ERM")
injected_handlers = {}
for server in servers:
injected_handlers[server.ip_addr] = await inject_error_one_shot(
manager.api, server.ip_addr, 'mapreduce_pause_parallel_dispatch')
query = "SELECT count(*) FROM {}".format(table)
query_timeout = 300
logger.info("Starting to execute query={query} with query_timeout={query_timeout}")
query_future = cql.run_async(f"{query} USING TIMEOUT {query_timeout}s", timeout=2*query_timeout)
logger.info("Confirm reads are waiting on the injected error")
async def wait_for_log_on_any_node(server):
server_log = await manager.server_open_log(server.server_id)
await server_log.wait_for("mapreduce_pause_parallel_dispatch: waiting for message")
await wait_for_first_completed([wait_for_log_on_any_node(server) for server in servers])
if enable_tablets:
logger.info("Add new node - ERM should not be blocked")
await manager.server_add()
logger.info("Unblock reads")
for server in servers:
await injected_handlers[server.ip_addr].message()
res = await query_future
assert res[0][0] == num_rows