Files
scylladb/test/cluster/test_counters_with_tablets.py
Andrei Chekun cc5ac75d73 test.py: remove deprecated skip_mode decorator
Finishing the deprecation of the skip_mode function in favor of
pytest.mark.skip_mode. This PR is only cleaning and migrating leftover tests
that are still used and old way of skip_mode.

Closes scylladb/scylladb#28299
2026-01-25 18:17:27 +02:00

94 lines
3.8 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.manager_client import ManagerClient
from test.cluster.util import new_test_keyspace
from test.pylib.tablets import get_tablet_replica
import asyncio
import logging
import pytest
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.parametrize("migration_type", ["internode", "intranode"])
async def test_counter_updates_during_tablet_migration(manager: ManagerClient, migration_type: str):
"""
Test that counter updates remain consistent during tablet migrations.
This test performs concurrent counter increment operations on a single partition
while simultaneously triggering a tablet migration, either between nodes or
between shards on the same node.
The test verifies that counter consistency is maintained throughout the migration
process: no counter updates are lost, and the final counter value matches the total
number of increments performed.
"""
if migration_type == "intranode":
node_count = 1
else:
node_count = 3
cmdline = ['--smp', '2', '--logger-log-level', 'raft_topology=debug', '--logger-log-level', 'storage_service=debug']
servers = await manager.servers_add(node_count, cmdline=cmdline)
cql = manager.get_cql()
await manager.disable_tablet_balancing()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets={'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.counters (pk int PRIMARY KEY, c counter)")
stop_event = asyncio.Event()
pk = 1 # Single partition key for all updates
async def do_counter_updates():
"""Continuously update a single counter during migration"""
update_count = 0
while not stop_event.is_set():
await asyncio.gather(*[cql.run_async(f"UPDATE {ks}.counters SET c = c + 1 WHERE pk = {pk}") for _ in range(100)])
update_count += 100
return update_count
async def do_migration():
"""Perform the specified type of tablet migration"""
try:
tablet_token = 0 # the single tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'counters', tablet_token)
src_host = replica[0]
src_shard = replica[1]
if migration_type == "internode":
# Find a different node to migrate to
all_host_ids = [await manager.get_host_id(server.server_id) for server in servers]
dst_host = next(host_id for host_id in all_host_ids if host_id != src_host)
dst_shard = 0
else: # migration_type == "intranode"
# Move to a different shard on the same node
dst_host = src_host
dst_shard = 1 - src_shard # Switch between shard 0 and 1
await manager.api.move_tablet(servers[0].ip_addr, ks, "counters", src_host, src_shard, dst_host, dst_shard, tablet_token)
finally:
stop_event.set()
# Run counter updates and migration concurrently
update_task = asyncio.create_task(do_counter_updates())
await asyncio.sleep(0.5)
await do_migration()
total_updates = await update_task
logger.info("Completed %d counter updates during migration", total_updates)
# Verify no increments were lost - counter value should equal number of updates
result = await cql.run_async(f"SELECT c FROM {ks}.counters WHERE pk = {pk}")
actual_count = result[0].c
assert actual_count == total_updates, f"Counter value mismatch: expected {total_updates}, got {actual_count}"