Files
scylladb/test/cluster/random_failures/test_random_failures.py
Andrei Chekun cc5ac75d73 test.py: remove deprecated skip_mode decorator
Finishing the deprecation of the skip_mode function in favor of
pytest.mark.skip_mode. This PR is only cleaning and migrating leftover tests
that are still used and old way of skip_mode.

Closes scylladb/scylladb#28299
2026-01-25 18:17:27 +02:00

240 lines
10 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import os
import sys
import time
import random
import logging
import itertools
from typing import TYPE_CHECKING
from contextlib import suppress
import psutil
import pytest
from cassandra.cluster import NoHostAvailable
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.cluster.util import wait_for_token_ring_and_group0_consistency, get_coordinator_host
from test.pylib.internal_types import ServerUpState
from test.cluster.random_failures.cluster_events import CLUSTER_EVENTS, TOPOLOGY_TIMEOUT, feed_rack_seed, get_random_rack
from test.cluster.random_failures.error_injections import ERROR_INJECTIONS, ERROR_INJECTIONS_NODE_MAY_HANG
if TYPE_CHECKING:
from test.pylib.random_tables import RandomTables
from test.pylib.manager_client import ManagerClient
from test.cluster.random_failures.cluster_events import ClusterEventType
TESTS_COUNT = 1 # number of tests from the whole matrix to run, None to run the full matrix.
# Following parameters can be adjusted to run same sequence of tests from a previous run. Look at logs for the values.
# Also see `pytest_generate_tests()` below for details.
# Seed for the tests order randomization.
TESTS_SHUFFLE_SEED = int(os.environ.get("TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED", random.randrange(sys.maxsize)))
ERROR_INJECTIONS_COUNT = len(ERROR_INJECTIONS) # change it to limit number of error injections
CLUSTER_EVENTS_COUNT = len(CLUSTER_EVENTS) # change it to limit number of cluster events
LOGGER = logging.getLogger(__name__)
def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
error_injections = ERROR_INJECTIONS[:ERROR_INJECTIONS_COUNT]
cluster_events = CLUSTER_EVENTS[:CLUSTER_EVENTS_COUNT]
tests = list(itertools.product(error_injections, cluster_events))
random.Random(TESTS_SHUFFLE_SEED).shuffle(tests)
feed_rack_seed(TESTS_SHUFFLE_SEED)
# Deselect unsupported combinations. Do it after the shuffle to have the stable order.
tests = [
(inj, event) for inj, event in tests if not inj.startswith("REMOVED_") and inj not in getattr(event, "deselected_random_failures", {})
]
metafunc.parametrize(["error_injection", "cluster_event"], tests[:TESTS_COUNT])
@pytest.fixture
async def four_nodes_cluster(manager: ManagerClient) -> None:
LOGGER.info("Booting initial 4-node cluster.")
servers = await manager.servers_add(4, property_file=[
{"dc": "dc1", "rack": "rack1"},
{"dc": "dc1", "rack": "rack2"},
{"dc": "dc1", "rack": "rack3"},
{"dc": "dc1", "rack": "rack3"}
])
for server in servers:
await manager.api.enable_injection(
node_ip=server.ip_addr,
injection="raft_server_set_snapshot_thresholds",
one_shot=True,
parameters={
"snapshot_threshold": "3",
"snapshot_trailing": "1",
}
)
await wait_for_token_ring_and_group0_consistency(manager=manager, deadline=time.time() + 30)
@pytest.mark.usefixtures("four_nodes_cluster")
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_random_failures(manager: ManagerClient,
random_tables: RandomTables,
error_injection: str,
cluster_event: ClusterEventType) -> None:
LOGGER.info(
"To repeat this run set TESTS_COUNT to %s, TESTS_SHUFFLE_SEED to %s, ERROR_INJECTIONS_COUNT to %s,"
" and CLUSTER_EVENTS_COUNT to %s",
TESTS_COUNT, TESTS_SHUFFLE_SEED, ERROR_INJECTIONS_COUNT, CLUSTER_EVENTS_COUNT,
)
table = await random_tables.add_table(ncolumns=5)
await table.insert_seq()
cluster_event_steps = cluster_event(manager, random_tables, error_injection)
LOGGER.info("Run preparation step of the cluster event.")
await anext(cluster_event_steps)
if error_injection == "stop_after_updating_cdc_generation":
# This error injection is a special one and should be handled on a coordinator node:
# 1. Enable `topology_coordinator_pause_after_updating_cdc_generation` injection on a coordinator node
# 2. Bootstrap a new node
# 3. Wait till the injection handler will print the message to the log
# 4. Pause the bootstrapping node
# 5. Send the message to the injection handler to continue
coordinator = await get_coordinator_host(manager=manager)
await manager.api.enable_injection(
node_ip=coordinator.ip_addr,
injection="topology_coordinator_pause_after_updating_cdc_generation",
one_shot=True,
)
coordinator_log = await manager.server_open_log(server_id=coordinator.server_id)
coordinator_log_mark = await coordinator_log.mark()
rack = get_random_rack()
s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED,
property_file={"dc": "dc1", "rack": rack})
await coordinator_log.wait_for(
"topology_coordinator_pause_after_updating_cdc_generation: waiting",
from_mark=coordinator_log_mark,
)
await manager.server_pause(server_id=s_info.server_id)
await manager.api.message_injection(
node_ip=coordinator.ip_addr,
injection="topology_coordinator_pause_after_updating_cdc_generation",
)
else:
rack = get_random_rack()
s_info = await manager.server_add(
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]},
property_file={"dc": "dc1", "rack": rack},
expected_server_up_state=ServerUpState.PROCESS_STARTED,
)
LOGGER.info("Wait till the bootstrapping node will be paused.")
await manager.wait_for_scylla_process_status(
server_id=s_info.server_id,
expected_statuses=[
psutil.STATUS_STOPPED,
],
deadline=time.time() + 180,
)
LOGGER.info("Run the cluster event main step.")
try:
cluster_event_start = time.perf_counter()
await anext(cluster_event_steps)
cluster_event_duration = time.perf_counter() - cluster_event_start
LOGGER.info("Cluster event `%s' took %.1fs", cluster_event.__name__, cluster_event_duration)
finally:
LOGGER.info("Unpause the server and wait till it wake up and become connectable using CQL.")
await manager.server_unpause(server_id=s_info.server_id)
server_log = await manager.server_open_log(server_id=s_info.server_id)
if error_injection in ERROR_INJECTIONS_NODE_MAY_HANG:
LOGGER.info("Expecting the added node can hang and we'll have a message in the coordinator's log. See #18638.")
coordinator = await get_coordinator_host(manager=manager)
coordinator_log = await manager.server_open_log(server_id=coordinator.server_id)
coordinator_log_pattern = r"The node may hang\. It's safe to shut it down manually now\."
if matches := await server_log.grep(r"init - Setting local host id to (?P<hostid>[0-9a-f-]+)"):
line, match = matches[-1]
LOGGER.info("Found following message in the coordinator's log:\n\t%s", line)
coordinator_log_pattern += (
rf"|updating topology state: rollback {match.group('hostid')} after bootstrapping failure, moving"
rf" transition state to left token ring and setting cleanup flag"
)
if matches := await coordinator_log.grep(coordinator_log_pattern):
LOGGER.info("Found following message in the coordinator's log:\n\t%s", matches[-1][0])
await manager.server_stop(server_id=s_info.server_id)
BANNED_NOTIFICATION = "received notification of being banned from the cluster from"
STARTUP_FAILED_PATTERN = f"init - Startup failed:|{BANNED_NOTIFICATION}"
if s_info in await manager.running_servers():
LOGGER.info("Wait until the new node initialization completes or fails.")
await server_log.wait_for(f"init - (Startup failed:|Scylla version .* initialization completed)|{BANNED_NOTIFICATION}", timeout=120)
if await server_log.grep(STARTUP_FAILED_PATTERN):
LOGGER.info("Check that the new node is dead.")
expected_statuses = [psutil.STATUS_DEAD]
else:
LOGGER.info("Check that the new node is running.")
expected_statuses = [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING]
scylla_process_status = await manager.wait_for_scylla_process_status(
server_id=s_info.server_id,
expected_statuses=expected_statuses,
)
else:
scylla_process_status = psutil.STATUS_DEAD
if scylla_process_status in (psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING):
LOGGER.info("The new node is running. Check if we can connect to it.")
async def driver_connect() -> bool | None:
try:
await manager.driver_connect(server=s_info)
except NoHostAvailable:
LOGGER.info("Driver not connected to %s yet", s_info.ip_addr)
return None
return True
LOGGER.info("Check for the new node's health.")
await wait_for(pred=driver_connect, deadline=time.time() + 90)
await wait_for_cql_and_get_hosts(cql=manager.cql, servers=[s_info], deadline=time.time() + 30)
else:
if s_info in await manager.running_servers():
LOGGER.info("The new node is dead. Check if it failed to startup.")
assert await server_log.grep(STARTUP_FAILED_PATTERN)
await manager.server_stop(server_id=s_info.server_id) # remove the node from the list of running servers
LOGGER.info("Try to remove the dead new node from the cluster.")
with suppress(Exception):
await manager.remove_node(
initiator_id=(await manager.running_servers())[0].server_id,
server_id=s_info.server_id,
wait_removed_dead=False,
timeout=TOPOLOGY_TIMEOUT,
)
LOGGER.info("Check for the cluster's health.")
await manager.driver_connect()
await wait_for_token_ring_and_group0_consistency(manager=manager, deadline=time.time() + 90)
await table.add_column()
await random_tables.verify_schema()
await anext(cluster_event_steps, None) # use default value to suppress StopIteration
await random_tables.verify_schema()