diff --git a/test/topology/util.py b/test/topology/util.py index e7a289bb70..763cee70de 100644 --- a/test/topology/util.py +++ b/test/topology/util.py @@ -257,6 +257,49 @@ async def check_system_topology_and_cdc_generations_v3_consistency(manager: Mana # Check that the contents fetched from the current host are the same as for other nodes assert topo_results[0] == topo_res +async def check_node_log_for_failed_mutations(manager: ManagerClient, server: ServerInfo): + logging.info(f"Checking that node {server} had no failed mutations") + log = await manager.server_open_log(server.server_id) + occurrences = await log.grep(expr="Failed to apply mutation from") + assert len(occurrences) == 0 + + +async def start_writes(cql: Session, rf: int, cl: ConsistencyLevel, concurrency: int = 3): + logging.info(f"Starting to asynchronously write, concurrency = {concurrency}") + + stop_event = asyncio.Event() + + ks_name = unique_name() + await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}}") + await cql.run_async(f"USE {ks_name}") + await cql.run_async(f"CREATE TABLE tbl (pk int PRIMARY KEY, v int)") + + # In the test we only care about whether operations report success or not + # and whether they trigger errors in the nodes' logs. Inserting the same + # value repeatedly is enough for our purposes. + stmt = SimpleStatement("INSERT INTO tbl (pk, v) VALUES (0, 0)", consistency_level=cl) + + async def do_writes(worker_id: int): + write_count = 0 + while not stop_event.is_set(): + start_time = time.time() + try: + await cql.run_async(stmt) + write_count += 1 + except Exception as e: + logging.error(f"Write started {time.time() - start_time}s ago failed: {e}") + raise + logging.info(f"Worker #{worker_id} did {write_count} successful writes") + + tasks = [asyncio.create_task(do_writes(worker_id)) for worker_id in range(concurrency)] + + async def finish(): + logging.info("Stopping write workers") + stop_event.set() + await asyncio.gather(*tasks) + + return finish + async def start_writes_to_cdc_table(cql: Session, concurrency: int = 3): logger.info(f"Starting to asynchronously write, concurrency = {concurrency}") diff --git a/test/topology_experimental_raft/test_topology_ops.py b/test/topology_experimental_raft/test_topology_ops.py index 0d2ff4a45e..9b739b6770 100644 --- a/test/topology_experimental_raft/test_topology_ops.py +++ b/test/topology_experimental_raft/test_topology_ops.py @@ -5,14 +5,12 @@ # from test.pylib.scylla_cluster import ReplaceConfig from test.pylib.manager_client import ManagerClient -from test.pylib.internal_types import ServerInfo -from test.pylib.util import unique_name, wait_for_cql_and_get_hosts -from test.topology.util import check_token_ring_and_group0_consistency, reconnect_driver +from test.pylib.util import wait_for_cql_and_get_hosts +from test.topology.util import check_token_ring_and_group0_consistency, reconnect_driver, \ + check_node_log_for_failed_mutations, start_writes -from cassandra.cluster import Session, ConsistencyLevel -from cassandra.query import SimpleStatement +from cassandra.cluster import ConsistencyLevel -import asyncio import time import pytest import logging @@ -42,7 +40,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo await wait_for_cql_and_get_hosts(manager.cql, servers, time.time() + 60) cql = await reconnect_driver(manager) - finish_writes = await start_writes(cql, rf) + finish_writes = await start_writes(cql, rf, ConsistencyLevel.ONE) logger.info(f"Decommissioning node {servers[0]}") await manager.decommission_node(servers[0].server_id) @@ -76,47 +74,3 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo for server in servers: await check_node_log_for_failed_mutations(manager, server) - - -async def check_node_log_for_failed_mutations(manager: ManagerClient, server: ServerInfo): - logger.info(f"Checking that node {server} had no failed mutations") - log = await manager.server_open_log(server.server_id) - occurrences = await log.grep(expr="Failed to apply mutation from") - assert len(occurrences) == 0 - - -async def start_writes(cql: Session, rf: int, concurrency: int = 3): - logger.info(f"Starting to asynchronously write, concurrency = {concurrency}") - - stop_event = asyncio.Event() - - ks_name = unique_name() - await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}}") - await cql.run_async(f"USE {ks_name}") - await cql.run_async(f"CREATE TABLE tbl (pk int PRIMARY KEY, v int)") - - # In the test we only care about whether operations report success or not - # and whether they trigger errors in the nodes' logs. Inserting the same - # value repeatedly is enough for our purposes. - stmt = SimpleStatement("INSERT INTO tbl (pk, v) VALUES (0, 0)", consistency_level=ConsistencyLevel.ONE) - - async def do_writes(worker_id: int): - write_count = 0 - while not stop_event.is_set(): - start_time = time.time() - try: - await cql.run_async(stmt) - write_count += 1 - except Exception as e: - logger.error(f"Write started {time.time() - start_time}s ago failed: {e}") - raise - logger.info(f"Worker #{worker_id} did {write_count} successful writes") - - tasks = [asyncio.create_task(do_writes(worker_id)) for worker_id in range(concurrency)] - - async def finish(): - logger.info("Stopping write workers") - stop_event.set() - await asyncio.gather(*tasks) - - return finish