Files
scylladb/test/cluster/test_streaming_deadlock.py
Aleksandra Martyniuk 5235e3cf67 test: limit test_streaming_deadlock_removenode concurrency
test_streaming_deadlock_removenode starts 10240 inserts at once,
overloading a node. Due to that test fails with timeout.

Limit inserts concurrency.

Fixes: #25945.

Closes scylladb/scylladb#26102
2025-09-19 12:50:20 +03:00

53 lines
2.0 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import pytest
import time
from cassandra.cluster import ConsistencyLevel
from test.cluster.dtest.alternator_utils import random_string
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_streaming_deadlock_removenode(request, manager: ManagerClient):
# Force removenode to exercise range_streamer and not repair.
# The bug is in the streaming, and when senders are on different nodes,
# and receivers are cross-located (B->C, C->B).
cfg = {
'rf_rack_valid_keyspaces': False,
'tablets_mode_for_new_keyspaces': 'disabled',
'maintenance_reader_concurrency_semaphore_count_limit': 1,
'enable_repair_based_node_ops': False,
'enable_cache': False, # Force IO
}
cmdline = [
'--logger-log-level', 'stream_session=trace',
'--logger-log-level', 'query_processor=trace'
]
servers = await manager.servers_add(3, config=cfg, cmdline=cmdline)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int, v text);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv AS SELECT * FROM {ks}.test "
"WHERE c IS NOT NULL and pk IS NOT NULL PRIMARY KEY (c, pk)")
keys = range(1024)
val = random_string(10240)
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c, v) VALUES (?, ?, '{val}')")
for _ in range(10):
await asyncio.gather(*[cql.run_async(stmt, [k, k]) for k in keys])
await manager.server_stop_gracefully(servers[0].server_id)
await manager.remove_node(servers[1].server_id, servers[0].server_id)