Consider the following scenario: 1. A table has RF=3 and writes use CL=QUORUM 2. One node is down 3. There is a pending tablet migration from the unavailable node that is reverted During the revert, there can be a time window where the pending replica being cleaned up still accepts writes. This leads to write failures, as only two nodes (out of four) are able to acknowledge writes. This patch fixes the issue by adding a barrier to the cleanup_target tablet transition state, ensuring that the coordinator switches back to the previous replica set before cleanup is triggered. Fixes https://github.com/scylladb/scylladb/issues/26512
544 lines
26 KiB
Python
544 lines
26 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from collections import defaultdict
|
|
|
|
from cassandra.query import SimpleStatement, ConsistencyLevel
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import HTTPError, read_barrier
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_info
|
|
from test.pylib.util import start_writes
|
|
from test.cluster.conftest import skip_mode
|
|
from test.cluster.util import wait_for_cql_and_get_hosts, new_test_keyspace, reconnect_driver, wait_for
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
import glob
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.parametrize("action", ['move', 'add_replica', 'del_replica'])
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_transition_sanity(manager: ManagerClient, action):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
host_ids = []
|
|
servers = []
|
|
hosts_by_rack = defaultdict(list)
|
|
|
|
async def make_server(rack: str):
|
|
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
|
servers.append(s)
|
|
host = await manager.get_host_id(s.server_id)
|
|
hosts_by_rack[rack].append(host)
|
|
host_ids.append(host)
|
|
await manager.api.disable_tablet_balancing(s.ip_addr)
|
|
|
|
await make_server("r1")
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
|
|
|
|
old_replica = None
|
|
for r in replicas[0].replicas:
|
|
if r[0] in hosts_by_rack['r1']:
|
|
old_replica = r
|
|
break
|
|
|
|
for h in hosts_by_rack['r1']:
|
|
if h != old_replica[0]:
|
|
new_replica = (h, 0)
|
|
break
|
|
else:
|
|
assert False, "Cannot find node without replica in rack r1"
|
|
|
|
if action == 'move':
|
|
logger.info(f"Move tablet {old_replica[0]} -> {new_replica[0]}")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
|
|
if action == 'add_replica':
|
|
logger.info(f"Adding replica to tablet, host {new_replica[0]}")
|
|
await manager.api.add_tablet_replica(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], 0)
|
|
if action == 'del_replica':
|
|
logger.info(f"Deleting replica from tablet, host {old_replica[0]}")
|
|
await manager.api.del_tablet_replica(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], 0)
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is now on [{replicas}]")
|
|
assert len(replicas) == 1
|
|
replicas = [ r[0] for r in replicas[0].replicas ]
|
|
if action == 'move':
|
|
assert len(replicas) == 2
|
|
assert new_replica[0] in replicas
|
|
assert old_replica[0] not in replicas
|
|
if action == 'add_replica':
|
|
assert len(replicas) == 3
|
|
assert old_replica[0] in replicas
|
|
assert new_replica[0] in replicas
|
|
if action == 'del_replica':
|
|
assert len(replicas) == 1
|
|
assert old_replica[0] not in replicas
|
|
|
|
for h, s in zip(host_ids, servers):
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
if h != host_ids[0]:
|
|
await read_barrier(manager.api, host[0].address) # host-0 did the barrier in get_all_tablet_replicas above
|
|
res = await cql.run_async(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({ks}.test)", host=host[0])
|
|
logger.info(f"Host {h} reports {res} as mutation fragments count")
|
|
if h in replicas:
|
|
assert res[0].count != 0
|
|
else:
|
|
assert res[0].count == 0
|
|
|
|
|
|
@pytest.mark.parametrize("fail_replica", ["source", "destination"])
|
|
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
|
|
if fail_stage == 'cleanup' and fail_replica == 'destination':
|
|
pytest.skip('Failing destination during cleanup is pointless')
|
|
if fail_stage == 'cleanup_target' and fail_replica == 'source':
|
|
pytest.skip('Failing source during target cleanup is pointless')
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
|
|
host_ids = []
|
|
servers = []
|
|
|
|
async def make_server(rack: str):
|
|
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
|
servers.append(s)
|
|
host_ids.append(await manager.get_host_id(s.server_id))
|
|
await manager.api.disable_tablet_balancing(s.ip_addr)
|
|
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
await make_server("r3")
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
await make_server("r2")
|
|
|
|
if fail_stage in ["cleanup_target", "revert_migration"]:
|
|
# we'll stop 2 servers, group0 quorum should be there - we need five
|
|
# nodes to have three remaining
|
|
#
|
|
# also one of the extra nodes will be used to call removenode on
|
|
# removing the 1st node will wait for the operation to go through
|
|
# raft log, and it will not finish before tablet migration. An
|
|
# attempt to remove the 2nd node, to make cleanup_target stage
|
|
# go ahead, will step on the legacy API lock on storage_service,
|
|
# so we need to ask some other node to do it
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
|
|
logger.info(f"Cluster is [{host_ids}]")
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 3
|
|
|
|
last_token = replicas[0].last_token
|
|
old_replica = None
|
|
for r in replicas[0].replicas:
|
|
assert r[0] != host_ids[3], "Tablet got migrated to node3"
|
|
if r[0] == host_ids[1]:
|
|
old_replica = r
|
|
assert old_replica is not None
|
|
new_replica = (host_ids[3], 0)
|
|
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
|
|
|
|
class node_failer:
|
|
def __init__(self, stage, replica, ks):
|
|
self.stage = stage
|
|
self.replica = replica
|
|
self.fail_idx = 1 if self.replica == "source" else 3
|
|
self.ks = ks
|
|
|
|
async def setup(self):
|
|
logger.info(f"Will fail {self.stage}")
|
|
if self.stage == "streaming":
|
|
await manager.api.enable_injection(servers[3].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
self.log = await manager.server_open_log(servers[3].server_id)
|
|
self.mark = await self.log.mark()
|
|
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
|
|
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "raft_topology_barrier_and_drain_fail", one_shot=False,
|
|
parameters={'keyspace': self.ks, 'table': 'test', 'last_token': last_token, 'stage': self.stage.removeprefix('do_')})
|
|
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
|
|
self.mark = await self.log.mark()
|
|
elif self.stage == "cleanup":
|
|
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "cleanup_tablet_crash", one_shot=True)
|
|
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
|
|
self.mark = await self.log.mark()
|
|
elif self.stage == "cleanup_target":
|
|
assert self.fail_idx == 3
|
|
self.stream_fail = node_failer('streaming', 'source', ks)
|
|
await self.stream_fail.setup()
|
|
self.cleanup_fail = node_failer('cleanup', 'destination', ks)
|
|
await self.cleanup_fail.setup()
|
|
elif self.stage == "revert_migration":
|
|
self.wbro_fail = node_failer('write_both_read_old', 'source' if self.replica == 'destination' else 'destination', ks)
|
|
await self.wbro_fail.setup()
|
|
self.revert_fail = node_failer('do_revert_migration', self.replica, ks)
|
|
await self.revert_fail.setup()
|
|
else:
|
|
assert False, f"Unknown stage {self.stage}"
|
|
|
|
async def wait(self):
|
|
logger.info(f"Wait for {self.stage} to happen")
|
|
if self.stage == "streaming":
|
|
await self.log.wait_for('stream_mutation_fragments: waiting', from_mark=self.mark)
|
|
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
|
|
await self.log.wait_for('raft_topology_cmd: barrier handler waits', from_mark=self.mark);
|
|
elif self.stage == "cleanup":
|
|
await self.log.wait_for('Crashing tablet cleanup', from_mark=self.mark)
|
|
elif self.stage == "cleanup_target":
|
|
await self.stream_fail.wait()
|
|
self.stream_stop_task = asyncio.create_task(self.stream_fail.stop())
|
|
await self.cleanup_fail.wait()
|
|
elif self.stage == "revert_migration":
|
|
await self.wbro_fail.wait()
|
|
self.wbro_fail_task = asyncio.create_task(self.wbro_fail.stop())
|
|
await self.revert_fail.wait()
|
|
else:
|
|
assert False
|
|
|
|
async def stop(self, via=0):
|
|
if self.stage == "cleanup_target":
|
|
await self.cleanup_fail.stop(via=4) # removenode of source is happening via node0 already
|
|
await self.stream_stop_task
|
|
return
|
|
if self.stage == "revert_migration":
|
|
await self.revert_fail.stop(via=4)
|
|
await self.wbro_fail_task
|
|
return
|
|
|
|
logger.info(f"Stop {self.replica} {host_ids[self.fail_idx]}")
|
|
await manager.server_stop(servers[self.fail_idx].server_id)
|
|
logger.info(f"Remove {self.replica} {host_ids[self.fail_idx]} via {host_ids[via]}")
|
|
await manager.remove_node(servers[via].server_id, servers[self.fail_idx].server_id)
|
|
logger.info(f"Done with {self.replica} {host_ids[self.fail_idx]}")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test")
|
|
|
|
failer = node_failer(fail_stage, fail_replica, ks)
|
|
await failer.setup()
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0))
|
|
await failer.wait()
|
|
await failer.stop()
|
|
|
|
logger.info("Done, waiting for migration to finish")
|
|
await migration_task
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is now on [{replicas}]")
|
|
assert len(replicas) == 1
|
|
for r in replicas[0].replicas:
|
|
assert r[0] != host_ids[failer.fail_idx]
|
|
|
|
await finish_writes()
|
|
|
|
# For dropping the keyspace after the node failure
|
|
await reconnect_driver(manager)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_back_and_forth_migration(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
host_ids = []
|
|
servers = []
|
|
|
|
async def make_server():
|
|
s = await manager.server_add(config=cfg)
|
|
servers.append(s)
|
|
host_ids.append(await manager.get_host_id(s.server_id))
|
|
await manager.api.disable_tablet_balancing(s.ip_addr)
|
|
|
|
async def assert_rows(num):
|
|
res = await cql.run_async(f"SELECT * FROM {ks}.test")
|
|
assert len(res) == num
|
|
|
|
await make_server()
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await make_server()
|
|
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({1}, {1});")
|
|
await assert_rows(1)
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
|
|
|
|
old_replica = replicas[0].replicas[0]
|
|
assert old_replica[0] != host_ids[1]
|
|
new_replica = (host_ids[1], 0)
|
|
|
|
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
|
|
|
|
await assert_rows(1)
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({2}, {2});")
|
|
await assert_rows(2)
|
|
|
|
logger.info(f"Moving tablet {new_replica} -> {old_replica}")
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], old_replica[0], old_replica[1], 0)
|
|
|
|
await assert_rows(2)
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({3}, {3});")
|
|
await assert_rows(3)
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_staging_backlog_is_preserved_with_file_based_streaming(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
# the error injection will halt view updates from staging, allowing migration to transfer the view update backlog.
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled',
|
|
'error_injections_at_startup': ['view_update_generator_consume_staging_sstable']}
|
|
servers = [await manager.server_add(config=cfg)]
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
logger.info("Populating single tablet")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
# check
|
|
async def check(expected):
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == len(expected)
|
|
await check(keys)
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(config=cfg))
|
|
|
|
async def get_table_dir(manager, server_id):
|
|
node_workdir = await manager.server_get_workdir(server_id)
|
|
return glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
|
|
s0_table_dir = await get_table_dir(manager, servers[0].server_id)
|
|
logger.info(f"Table dir in server 0: {s0_table_dir}")
|
|
|
|
s1_table_dir = await get_table_dir(manager, servers[1].server_id)
|
|
logger.info(f"Table dir in server 1: {s1_table_dir}")
|
|
|
|
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
|
|
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
|
|
manager.driver_close()
|
|
cql = None
|
|
await manager.server_stop_gracefully(servers[0].server_id)
|
|
|
|
def move_sstables_to_staging(table_dir: str):
|
|
table_staging_dir = os.path.join(table_dir, "staging")
|
|
logger.info(f"Moving sstables to staging dir: {table_staging_dir}")
|
|
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
|
|
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
|
|
dst_path = os.path.join(table_staging_dir, os.path.basename(src_path))
|
|
logger.info(f"Moving sstable file {src_path} to {dst_path}")
|
|
os.rename(src_path, dst_path)
|
|
|
|
def sstable_count_in_staging(table_dir: str):
|
|
table_staging_dir = os.path.join(table_dir, "staging")
|
|
return len(glob.glob(os.path.join(table_staging_dir, "*-Data.db")))
|
|
|
|
move_sstables_to_staging(s0_table_dir)
|
|
s0_sstables_in_staging = sstable_count_in_staging(s0_table_dir)
|
|
|
|
await manager.server_start(servers[0].server_id)
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
# FIXME: After https://github.com/scylladb/scylladb/issues/19149 is fixed, we can check that view updates complete
|
|
# after migration and then check for base-view consistency. By the time being, we only check that backlog is
|
|
# transferred by looking at staging directory.
|
|
|
|
s1_sstables_in_staging = sstable_count_in_staging(s1_table_dir)
|
|
logger.info(f"SSTable count in staging dir of server 1: {s1_sstables_in_staging}")
|
|
|
|
logger.info("Allowing view update generator to progress again")
|
|
for server in servers:
|
|
manager.api.disable_injection(server.ip_addr, 'view_update_generator_consume_staging_sstable')
|
|
|
|
assert s0_sstables_in_staging > 0
|
|
assert s0_sstables_in_staging == s1_sstables_in_staging
|
|
|
|
await check(keys)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("migration_stage_and_injection", [("cleanup", "cleanup_tablet_wait"), ("end_migration", "handle_tablet_migration_end_migration")], ids=["cleanup", "end_migration"])
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, migration_stage_and_injection):
|
|
"""
|
|
Migrate a tablet from one node to another, and while in some migration
|
|
cleanup stage, either before or after the tablet is cleaned, restart the
|
|
leaving replica. Then trigger a tablet merge.
|
|
|
|
This reproduces issue #23481: when the leaving replica is restarted in
|
|
end_migration, after the SSTables are cleaned, it starts and allocates the
|
|
state for the tablet in the storage group manager, even though it was
|
|
cleaned already. The resulting state causes the following merge process to
|
|
fail with an assert.
|
|
"""
|
|
stage, injection = migration_stage_and_injection
|
|
|
|
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
|
|
|
|
total_keys = 10
|
|
for pk in range(total_keys):
|
|
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_token = 0
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Find which server holds the tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
if replica[0] == s0_host_id:
|
|
src_server, dst_host_id = servers[0], s1_host_id
|
|
else:
|
|
src_server, dst_host_id = servers[1], s0_host_id
|
|
|
|
# Injection for waiting in cleanup stage
|
|
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, injection, one_shot=False) for s in servers])
|
|
|
|
# Start migration - move tablet to other node
|
|
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
|
|
|
|
# Wait for the tablet to reach cleanup stage
|
|
async def tablet_stage_is_cleanup():
|
|
tinfo = await get_tablet_info(manager, servers[0], ks, 'test', tablet_token)
|
|
if tinfo.stage == stage:
|
|
return True
|
|
await wait_for(tablet_stage_is_cleanup, time.time() + 60)
|
|
|
|
# Restart the leaving replica (src_server)
|
|
await manager.server_restart(src_server.server_id)
|
|
|
|
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
|
|
|
|
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
# Trigger tablet merge to reproduce #23481
|
|
table_id = await manager.get_table_id(ks, 'test')
|
|
async def get_tablet_count():
|
|
rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where table_id = {table_id}")
|
|
return rows[0].tablet_count
|
|
|
|
old_tablet_count = await get_tablet_count()
|
|
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': 1}}")
|
|
|
|
async def tablets_merged():
|
|
new_tablet_count = await get_tablet_count()
|
|
if new_tablet_count < old_tablet_count:
|
|
return True
|
|
await wait_for(tablets_merged, time.time() + 60)
|
|
|
|
# Workaround for https://github.com/scylladb/scylladb/issues/21779. We don't want the keyspace drop at the end
|
|
# of new_test_keyspace to fail because of concurrent tablet migrations.
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_restart_in_cleanup_stage_after_cleanup(manager: ManagerClient):
|
|
"""
|
|
Migrate a tablet from one node to another, and restart the leaving replica during
|
|
the tablet cleanup stage, after tablet cleanup is completed.
|
|
Reproduces issue #24857
|
|
"""
|
|
cfg = {'tablet_load_stats_refresh_interval_in_seconds': 1}
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
|
|
|
|
total_keys = 10
|
|
for pk in range(total_keys):
|
|
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_token = 0
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Find which server holds the tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
if replica[0] == s0_host_id:
|
|
src_server, dst_host_id = servers[0], s1_host_id
|
|
else:
|
|
src_server, dst_host_id = servers[1], s0_host_id
|
|
|
|
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "wait_after_tablet_cleanup", one_shot=False) for s in servers])
|
|
|
|
log = await manager.server_open_log(servers[0].server_id)
|
|
mark = await log.mark()
|
|
|
|
# Start migration - move tablet to other node
|
|
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
|
|
|
|
await log.wait_for("Waiting after tablet cleanup", from_mark=mark, timeout=60)
|
|
|
|
# Restart the leaving replica (src_server)
|
|
await manager.server_stop(src_server.server_id)
|
|
await manager.server_start(src_server.server_id)
|
|
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 30)
|
|
|
|
await asyncio.gather(*[manager.api.message_injection(s.ip_addr, "wait_after_tablet_cleanup") for s in servers])
|
|
|
|
await manager.api.quiesce_topology(servers[0].ip_addr)
|