test: test_topology_ops: move helpers to topology/util.py

In one of the following patches, we reuse the helper functions from
`test_topology_ops` in a new test, so we move them to `util.py`.

Also, we add the `cl` parameter to `start_writes`, as the new test
will use `cl=2`.
This commit is contained in:
Patryk Jędrzejczak
2024-07-10 13:09:43 +02:00
parent 574c252391
commit d43d67c525
2 changed files with 48 additions and 51 deletions

View File

@@ -257,6 +257,49 @@ async def check_system_topology_and_cdc_generations_v3_consistency(manager: Mana
# Check that the contents fetched from the current host are the same as for other nodes
assert topo_results[0] == topo_res
async def check_node_log_for_failed_mutations(manager: ManagerClient, server: ServerInfo):
logging.info(f"Checking that node {server} had no failed mutations")
log = await manager.server_open_log(server.server_id)
occurrences = await log.grep(expr="Failed to apply mutation from")
assert len(occurrences) == 0
async def start_writes(cql: Session, rf: int, cl: ConsistencyLevel, concurrency: int = 3):
logging.info(f"Starting to asynchronously write, concurrency = {concurrency}")
stop_event = asyncio.Event()
ks_name = unique_name()
await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}}")
await cql.run_async(f"USE {ks_name}")
await cql.run_async(f"CREATE TABLE tbl (pk int PRIMARY KEY, v int)")
# In the test we only care about whether operations report success or not
# and whether they trigger errors in the nodes' logs. Inserting the same
# value repeatedly is enough for our purposes.
stmt = SimpleStatement("INSERT INTO tbl (pk, v) VALUES (0, 0)", consistency_level=cl)
async def do_writes(worker_id: int):
write_count = 0
while not stop_event.is_set():
start_time = time.time()
try:
await cql.run_async(stmt)
write_count += 1
except Exception as e:
logging.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
logging.info(f"Worker #{worker_id} did {write_count} successful writes")
tasks = [asyncio.create_task(do_writes(worker_id)) for worker_id in range(concurrency)]
async def finish():
logging.info("Stopping write workers")
stop_event.set()
await asyncio.gather(*tasks)
return finish
async def start_writes_to_cdc_table(cql: Session, concurrency: int = 3):
logger.info(f"Starting to asynchronously write, concurrency = {concurrency}")

View File

@@ -5,14 +5,12 @@
#
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.pylib.internal_types import ServerInfo
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
from test.topology.util import check_token_ring_and_group0_consistency, reconnect_driver
from test.pylib.util import wait_for_cql_and_get_hosts
from test.topology.util import check_token_ring_and_group0_consistency, reconnect_driver, \
check_node_log_for_failed_mutations, start_writes
from cassandra.cluster import Session, ConsistencyLevel
from cassandra.query import SimpleStatement
from cassandra.cluster import ConsistencyLevel
import asyncio
import time
import pytest
import logging
@@ -42,7 +40,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
await wait_for_cql_and_get_hosts(manager.cql, servers, time.time() + 60)
cql = await reconnect_driver(manager)
finish_writes = await start_writes(cql, rf)
finish_writes = await start_writes(cql, rf, ConsistencyLevel.ONE)
logger.info(f"Decommissioning node {servers[0]}")
await manager.decommission_node(servers[0].server_id)
@@ -76,47 +74,3 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
for server in servers:
await check_node_log_for_failed_mutations(manager, server)
async def check_node_log_for_failed_mutations(manager: ManagerClient, server: ServerInfo):
logger.info(f"Checking that node {server} had no failed mutations")
log = await manager.server_open_log(server.server_id)
occurrences = await log.grep(expr="Failed to apply mutation from")
assert len(occurrences) == 0
async def start_writes(cql: Session, rf: int, concurrency: int = 3):
logger.info(f"Starting to asynchronously write, concurrency = {concurrency}")
stop_event = asyncio.Event()
ks_name = unique_name()
await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}}")
await cql.run_async(f"USE {ks_name}")
await cql.run_async(f"CREATE TABLE tbl (pk int PRIMARY KEY, v int)")
# In the test we only care about whether operations report success or not
# and whether they trigger errors in the nodes' logs. Inserting the same
# value repeatedly is enough for our purposes.
stmt = SimpleStatement("INSERT INTO tbl (pk, v) VALUES (0, 0)", consistency_level=ConsistencyLevel.ONE)
async def do_writes(worker_id: int):
write_count = 0
while not stop_event.is_set():
start_time = time.time()
try:
await cql.run_async(stmt)
write_count += 1
except Exception as e:
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
logger.info(f"Worker #{worker_id} did {write_count} successful writes")
tasks = [asyncio.create_task(do_writes(worker_id)) for worker_id in range(concurrency)]
async def finish():
logger.info("Stopping write workers")
stop_event.set()
await asyncio.gather(*tasks)
return finish