Files
scylladb/test/cluster/test_tablets_intranode.py
Andrei Chekun c950c2e582 test.py: convert skip_mode function to pytest.mark
Function skip_mode works only on function and only in cluster test. This if OK
when we need to skip one test, but it's not possible to use it with pytestmark
to automatically mark all tests in the file. The goal of this PR is to migrate

skip_mode to be dynamic pytest.mark that can be used as ordinary mark.

Closes scylladb/scylladb#27853

[avi: apply to test/cluster/test_tablets.py::test_table_creation_wakes_up_balancer]
2026-01-08 21:55:16 +02:00

179 lines
7.4 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from cassandra.cluster import Session, ConsistencyLevel
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts, start_writes
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
import pytest
import asyncio
import logging
import time
import os
import threading
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_intranode_migration(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=trace',
'--logger-log-level', 'stream_session=trace',
'--logger-log-level', 'tablets=trace',
'--logger-log-level', 'database=trace',
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
finish_writes = await start_writes(cql, ks, "test")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
src_shard = replica[1]
dst_shard = src_shard ^ 1
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
key_count = await finish_writes()
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == key_count
for r in rows:
assert r.c == r.pk
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_crash_during_intranode_migration(manager: ManagerClient):
cmdline = [
'--logger-log-level', 'tablets=trace',
'--logger-log-level', 'database=trace',
'--commitlog-sync', 'batch', # So that ACKed writes are not lost on crash
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
finish_writes = await start_writes(cql, ks, "test", ignore_errors=True)
tablet_token = 0 # Choose one tablet, any of them
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
src_shard = replica[1]
dst_shard = src_shard ^ 1
await manager.api.enable_injection(servers[0].ip_addr, 'crash-in-tablet-write-both-read-new', one_shot=True)
migration_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
replica[0], src_shard, replica[0], dst_shard, tablet_token))
s0_logs = await manager.server_open_log(servers[0].server_id)
await s0_logs.wait_for('crash-in-tablet-write-both-read-new hit')
await manager.server_stop(servers[0].server_id)
await manager.server_start(servers[0].server_id)
await wait_for_cql_and_get_hosts(manager.cql, servers, time.time() + 60)
# Wait for the tablet migration to finish
await manager.api.quiesce_topology(servers[0].ip_addr)
try:
await migration_task
except:
pass
key_count = await finish_writes()
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == key_count
for r in rows:
assert r.c == r.pk
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_cross_shard_migration(manager: ManagerClient):
"""
Test scenario where writes are concurrently made with migration, where
some of them are coordinated by the owning host and some by the non-owning host.
This reproduces the following problem with sharder logic:
1) node A: tablet is in stage write_both_read_new (replicas: B:1 -> A:1)
2) node B: tablet is in stage write_both_read_new
3) node A: tablet is in stage use_new
4) node B: coordinate write to node A (since stage is still "write both" here)
5) node A: receive write request, sharder thinks no shard owns the tablet, fails the write
In this scenario, sharder on node A should still return shard 1.
"""
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=trace',
'--logger-log-level', 'tablets=trace',
'--logger-log-level', 'database=trace',
]
servers = await manager.servers_add(2, cmdline=cmdline)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
finish_writes = await start_writes(cql, ks, "test")
tablet0_token = -1
tablet1_token = 1
replica0 = await get_tablet_replica(manager, servers[0], ks, 'test', tablet0_token)
replica1 = await get_tablet_replica(manager, servers[0], ks, 'test', tablet1_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Place tablets on non-zero shards so that defaulted shard (0) is never the right shard.
# This is to catch the problem when sharder (incorrectly) thinks that tablet does not have
# any replica on the current host and assigns shard 0 to it in shard_for_read().
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica0[0], replica0[1], s0_host_id, 1, tablet0_token)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica1[0], replica1[1], s1_host_id, 1, tablet1_token)
# Put whole token ring into migration so that all requests hit the migration path. Half of them
# will be coordinated by the owning host, half will be coordinated by the non-owning host.
migration0 = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
s0_host_id, 1, s1_host_id, 1, tablet0_token))
migration1 = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
s1_host_id, 1, s0_host_id, 1, tablet1_token))
await migration0
await migration1
key_count = await finish_writes()
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == key_count
for r in rows:
assert r.c == r.pk