Finishing the deprecation of the skip_mode function in favor of pytest.mark.skip_mode. This PR is only cleaning and migrating leftover tests that are still used and old way of skip_mode. Closes scylladb/scylladb#28299
178 lines
7.3 KiB
Python
178 lines
7.3 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from cassandra.cluster import Session, ConsistencyLevel
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.util import wait_for_cql_and_get_hosts, start_writes
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
|
|
from test.cluster.util import new_test_keyspace
|
|
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import os
|
|
import threading
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_intranode_migration(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'stream_session=trace',
|
|
'--logger-log-level', 'tablets=trace',
|
|
'--logger-log-level', 'database=trace',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test")
|
|
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
src_shard = replica[1]
|
|
dst_shard = src_shard ^ 1
|
|
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
|
|
|
|
key_count = await finish_writes()
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == key_count
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_crash_during_intranode_migration(manager: ManagerClient):
|
|
cmdline = [
|
|
'--logger-log-level', 'tablets=trace',
|
|
'--logger-log-level', 'database=trace',
|
|
'--commitlog-sync', 'batch', # So that ACKed writes are not lost on crash
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test", ignore_errors=True)
|
|
|
|
tablet_token = 0 # Choose one tablet, any of them
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
src_shard = replica[1]
|
|
dst_shard = src_shard ^ 1
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'crash-in-tablet-write-both-read-new', one_shot=True)
|
|
|
|
migration_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
replica[0], src_shard, replica[0], dst_shard, tablet_token))
|
|
|
|
s0_logs = await manager.server_open_log(servers[0].server_id)
|
|
await s0_logs.wait_for('crash-in-tablet-write-both-read-new hit')
|
|
await manager.server_stop(servers[0].server_id)
|
|
await manager.server_start(servers[0].server_id)
|
|
await wait_for_cql_and_get_hosts(manager.cql, servers, time.time() + 60)
|
|
|
|
# Wait for the tablet migration to finish
|
|
await manager.api.quiesce_topology(servers[0].ip_addr)
|
|
|
|
try:
|
|
await migration_task
|
|
except:
|
|
pass
|
|
|
|
key_count = await finish_writes()
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == key_count
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_cross_shard_migration(manager: ManagerClient):
|
|
"""
|
|
Test scenario where writes are concurrently made with migration, where
|
|
some of them are coordinated by the owning host and some by the non-owning host.
|
|
|
|
This reproduces the following problem with sharder logic:
|
|
1) node A: tablet is in stage write_both_read_new (replicas: B:1 -> A:1)
|
|
2) node B: tablet is in stage write_both_read_new
|
|
3) node A: tablet is in stage use_new
|
|
4) node B: coordinate write to node A (since stage is still "write both" here)
|
|
5) node A: receive write request, sharder thinks no shard owns the tablet, fails the write
|
|
|
|
In this scenario, sharder on node A should still return shard 1.
|
|
"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'tablets=trace',
|
|
'--logger-log-level', 'database=trace',
|
|
]
|
|
|
|
servers = await manager.servers_add(2, cmdline=cmdline)
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test")
|
|
|
|
tablet0_token = -1
|
|
tablet1_token = 1
|
|
replica0 = await get_tablet_replica(manager, servers[0], ks, 'test', tablet0_token)
|
|
replica1 = await get_tablet_replica(manager, servers[0], ks, 'test', tablet1_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Place tablets on non-zero shards so that defaulted shard (0) is never the right shard.
|
|
# This is to catch the problem when sharder (incorrectly) thinks that tablet does not have
|
|
# any replica on the current host and assigns shard 0 to it in shard_for_read().
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica0[0], replica0[1], s0_host_id, 1, tablet0_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica1[0], replica1[1], s1_host_id, 1, tablet1_token)
|
|
|
|
# Put whole token ring into migration so that all requests hit the migration path. Half of them
|
|
# will be coordinated by the owning host, half will be coordinated by the non-owning host.
|
|
migration0 = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
s0_host_id, 1, s1_host_id, 1, tablet0_token))
|
|
migration1 = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
s1_host_id, 1, s0_host_id, 1, tablet1_token))
|
|
|
|
await migration0
|
|
await migration1
|
|
|
|
key_count = await finish_writes()
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == key_count
|
|
for r in rows:
|
|
assert r.c == r.pk
|