Function skip_mode works only on function and only in cluster test. This if OK when we need to skip one test, but it's not possible to use it with pytestmark to automatically mark all tests in the file. The goal of this PR is to migrate skip_mode to be dynamic pytest.mark that can be used as ordinary mark. Closes scylladb/scylladb#27853 [avi: apply to test/cluster/test_tablets.py::test_table_creation_wakes_up_balancer]
179 lines
7.4 KiB
Python
179 lines
7.4 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from cassandra.cluster import Session, ConsistencyLevel
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.util import wait_for_cql_and_get_hosts, start_writes
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
|
|
from test.cluster.conftest import skip_mode
|
|
from test.cluster.util import new_test_keyspace
|
|
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import os
|
|
import threading
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_intranode_migration(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'stream_session=trace',
|
|
'--logger-log-level', 'tablets=trace',
|
|
'--logger-log-level', 'database=trace',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test")
|
|
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
src_shard = replica[1]
|
|
dst_shard = src_shard ^ 1
|
|
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
|
|
|
|
key_count = await finish_writes()
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == key_count
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_crash_during_intranode_migration(manager: ManagerClient):
|
|
cmdline = [
|
|
'--logger-log-level', 'tablets=trace',
|
|
'--logger-log-level', 'database=trace',
|
|
'--commitlog-sync', 'batch', # So that ACKed writes are not lost on crash
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test", ignore_errors=True)
|
|
|
|
tablet_token = 0 # Choose one tablet, any of them
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
src_shard = replica[1]
|
|
dst_shard = src_shard ^ 1
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'crash-in-tablet-write-both-read-new', one_shot=True)
|
|
|
|
migration_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
replica[0], src_shard, replica[0], dst_shard, tablet_token))
|
|
|
|
s0_logs = await manager.server_open_log(servers[0].server_id)
|
|
await s0_logs.wait_for('crash-in-tablet-write-both-read-new hit')
|
|
await manager.server_stop(servers[0].server_id)
|
|
await manager.server_start(servers[0].server_id)
|
|
await wait_for_cql_and_get_hosts(manager.cql, servers, time.time() + 60)
|
|
|
|
# Wait for the tablet migration to finish
|
|
await manager.api.quiesce_topology(servers[0].ip_addr)
|
|
|
|
try:
|
|
await migration_task
|
|
except:
|
|
pass
|
|
|
|
key_count = await finish_writes()
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == key_count
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_cross_shard_migration(manager: ManagerClient):
|
|
"""
|
|
Test scenario where writes are concurrently made with migration, where
|
|
some of them are coordinated by the owning host and some by the non-owning host.
|
|
|
|
This reproduces the following problem with sharder logic:
|
|
1) node A: tablet is in stage write_both_read_new (replicas: B:1 -> A:1)
|
|
2) node B: tablet is in stage write_both_read_new
|
|
3) node A: tablet is in stage use_new
|
|
4) node B: coordinate write to node A (since stage is still "write both" here)
|
|
5) node A: receive write request, sharder thinks no shard owns the tablet, fails the write
|
|
|
|
In this scenario, sharder on node A should still return shard 1.
|
|
"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'tablets=trace',
|
|
'--logger-log-level', 'database=trace',
|
|
]
|
|
|
|
servers = await manager.servers_add(2, cmdline=cmdline)
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test")
|
|
|
|
tablet0_token = -1
|
|
tablet1_token = 1
|
|
replica0 = await get_tablet_replica(manager, servers[0], ks, 'test', tablet0_token)
|
|
replica1 = await get_tablet_replica(manager, servers[0], ks, 'test', tablet1_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Place tablets on non-zero shards so that defaulted shard (0) is never the right shard.
|
|
# This is to catch the problem when sharder (incorrectly) thinks that tablet does not have
|
|
# any replica on the current host and assigns shard 0 to it in shard_for_read().
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica0[0], replica0[1], s0_host_id, 1, tablet0_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica1[0], replica1[1], s1_host_id, 1, tablet1_token)
|
|
|
|
# Put whole token ring into migration so that all requests hit the migration path. Half of them
|
|
# will be coordinated by the owning host, half will be coordinated by the non-owning host.
|
|
migration0 = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
s0_host_id, 1, s1_host_id, 1, tablet0_token))
|
|
migration1 = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
s1_host_id, 1, s0_host_id, 1, tablet1_token))
|
|
|
|
await migration0
|
|
await migration1
|
|
|
|
key_count = await finish_writes()
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == key_count
|
|
for r in rows:
|
|
assert r.c == r.pk
|