Currently the test iterates on all servers and calls manager.api.disable_injection but it doesn't await those calls. Use asyncio.gather to await all calls in parallel. Co-authored-by: Copilot CLI Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
558 lines
26 KiB
Python
558 lines
26 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from collections import defaultdict
|
|
from typing import Optional, Type
|
|
from aiohttp.client_exceptions import ServerDisconnectedError
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import HTTPError, read_barrier
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_info
|
|
from test.pylib.util import start_writes
|
|
from test.cluster.util import wait_for_cql_and_get_hosts, new_test_keyspace, reconnect_driver, wait_for
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
import glob
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def await_api_task(task, allowed_exception: Optional[Type[Exception]]=None, allowed_error: Optional[str]=None):
|
|
try:
|
|
await task
|
|
except allowed_exception as e:
|
|
if allowed_error and not re.search(allowed_error, str(e)):
|
|
raise
|
|
|
|
|
|
@pytest.mark.parametrize("action", ['move', 'add_replica', 'del_replica'])
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_transition_sanity(manager: ManagerClient, action):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
host_ids = []
|
|
servers = []
|
|
hosts_by_rack = defaultdict(list)
|
|
|
|
async def make_server(rack: str):
|
|
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
|
servers.append(s)
|
|
host = await manager.get_host_id(s.server_id)
|
|
hosts_by_rack[rack].append(host)
|
|
host_ids.append(host)
|
|
|
|
await make_server("r1")
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
|
|
|
|
old_replica = None
|
|
for r in replicas[0].replicas:
|
|
if r[0] in hosts_by_rack['r1']:
|
|
old_replica = r
|
|
break
|
|
|
|
for h in hosts_by_rack['r1']:
|
|
if h != old_replica[0]:
|
|
new_replica = (h, 0)
|
|
break
|
|
else:
|
|
assert False, "Cannot find node without replica in rack r1"
|
|
|
|
if action == 'move':
|
|
logger.info(f"Move tablet {old_replica[0]} -> {new_replica[0]}")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
|
|
if action == 'add_replica':
|
|
logger.info(f"Adding replica to tablet, host {new_replica[0]}")
|
|
await manager.api.add_tablet_replica(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], 0)
|
|
if action == 'del_replica':
|
|
logger.info(f"Deleting replica from tablet, host {old_replica[0]}")
|
|
await manager.api.del_tablet_replica(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], 0)
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is now on [{replicas}]")
|
|
assert len(replicas) == 1
|
|
replicas = [ r[0] for r in replicas[0].replicas ]
|
|
if action == 'move':
|
|
assert len(replicas) == 2
|
|
assert new_replica[0] in replicas
|
|
assert old_replica[0] not in replicas
|
|
if action == 'add_replica':
|
|
assert len(replicas) == 3
|
|
assert old_replica[0] in replicas
|
|
assert new_replica[0] in replicas
|
|
if action == 'del_replica':
|
|
assert len(replicas) == 1
|
|
assert old_replica[0] not in replicas
|
|
|
|
for h, s in zip(host_ids, servers):
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
if h != host_ids[0]:
|
|
await read_barrier(manager.api, host[0].address) # host-0 did the barrier in get_all_tablet_replicas above
|
|
res = await cql.run_async(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({ks}.test)", host=host[0])
|
|
logger.info(f"Host {h} reports {res} as mutation fragments count")
|
|
if h in replicas:
|
|
assert res[0].count != 0
|
|
else:
|
|
assert res[0].count == 0
|
|
|
|
|
|
@pytest.mark.parametrize("fail_replica", ["source", "destination"])
|
|
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
|
|
if fail_stage == 'cleanup' and fail_replica == 'destination':
|
|
pytest.skip('Failing destination during cleanup is pointless')
|
|
if fail_stage == 'cleanup_target' and fail_replica == 'source':
|
|
pytest.skip('Failing source during target cleanup is pointless')
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
|
|
host_ids = []
|
|
servers = []
|
|
|
|
async def make_server(rack: str):
|
|
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
|
servers.append(s)
|
|
host_ids.append(await manager.get_host_id(s.server_id))
|
|
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
await make_server("r3")
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
await make_server("r2")
|
|
|
|
if fail_stage in ["cleanup_target", "revert_migration"]:
|
|
# we'll stop 2 servers, group0 quorum should be there - we need five
|
|
# nodes to have three remaining
|
|
#
|
|
# also one of the extra nodes will be used to call removenode on
|
|
# removing the 1st node will wait for the operation to go through
|
|
# raft log, and it will not finish before tablet migration. An
|
|
# attempt to remove the 2nd node, to make cleanup_target stage
|
|
# go ahead, will step on the legacy API lock on storage_service,
|
|
# so we need to ask some other node to do it
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
|
|
logger.info(f"Cluster is [{host_ids}]")
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 3
|
|
|
|
last_token = replicas[0].last_token
|
|
old_replica = None
|
|
for r in replicas[0].replicas:
|
|
assert r[0] != host_ids[3], "Tablet got migrated to node3"
|
|
if r[0] == host_ids[1]:
|
|
old_replica = r
|
|
assert old_replica is not None
|
|
new_replica = (host_ids[3], 0)
|
|
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
|
|
|
|
class node_failer:
|
|
def __init__(self, stage, replica, ks):
|
|
self.stage = stage
|
|
self.replica = replica
|
|
self.fail_idx = 1 if self.replica == "source" else 3
|
|
self.ks = ks
|
|
|
|
async def setup(self):
|
|
logger.info(f"Will fail {self.stage}")
|
|
if self.stage == "streaming":
|
|
await manager.api.enable_injection(servers[3].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
self.log = await manager.server_open_log(servers[3].server_id)
|
|
self.mark = await self.log.mark()
|
|
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
|
|
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "raft_topology_barrier_and_drain_fail", one_shot=False,
|
|
parameters={'keyspace': self.ks, 'table': 'test', 'last_token': last_token, 'stage': self.stage.removeprefix('do_')})
|
|
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
|
|
self.mark = await self.log.mark()
|
|
elif self.stage == "cleanup":
|
|
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "cleanup_tablet_crash", one_shot=True)
|
|
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
|
|
self.mark = await self.log.mark()
|
|
elif self.stage == "cleanup_target":
|
|
assert self.fail_idx == 3
|
|
self.stream_fail = node_failer('streaming', 'source', ks)
|
|
await self.stream_fail.setup()
|
|
self.cleanup_fail = node_failer('cleanup', 'destination', ks)
|
|
await self.cleanup_fail.setup()
|
|
elif self.stage == "revert_migration":
|
|
self.wbro_fail = node_failer('write_both_read_old', 'source' if self.replica == 'destination' else 'destination', ks)
|
|
await self.wbro_fail.setup()
|
|
self.revert_fail = node_failer('do_revert_migration', self.replica, ks)
|
|
await self.revert_fail.setup()
|
|
else:
|
|
assert False, f"Unknown stage {self.stage}"
|
|
|
|
async def wait(self):
|
|
logger.info(f"Wait for {self.stage} to happen")
|
|
if self.stage == "streaming":
|
|
await self.log.wait_for('stream_mutation_fragments: waiting', from_mark=self.mark)
|
|
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
|
|
await self.log.wait_for('raft_topology_cmd: barrier handler waits', from_mark=self.mark);
|
|
elif self.stage == "cleanup":
|
|
await self.log.wait_for('Crashing tablet cleanup', from_mark=self.mark)
|
|
elif self.stage == "cleanup_target":
|
|
await self.stream_fail.wait()
|
|
self.stream_stop_task = asyncio.create_task(self.stream_fail.stop())
|
|
await self.cleanup_fail.wait()
|
|
elif self.stage == "revert_migration":
|
|
await self.wbro_fail.wait()
|
|
self.wbro_fail_task = asyncio.create_task(self.wbro_fail.stop())
|
|
await self.revert_fail.wait()
|
|
else:
|
|
assert False
|
|
|
|
async def stop(self, via=0):
|
|
if self.stage == "cleanup_target":
|
|
await self.cleanup_fail.stop(via=4) # removenode of source is happening via node0 already
|
|
await self.stream_stop_task
|
|
return
|
|
if self.stage == "revert_migration":
|
|
await self.revert_fail.stop(via=4)
|
|
await self.wbro_fail_task
|
|
return
|
|
|
|
logger.info(f"Stop {self.replica} {host_ids[self.fail_idx]}")
|
|
await manager.server_stop(servers[self.fail_idx].server_id)
|
|
logger.info(f"Remove {self.replica} {host_ids[self.fail_idx]} via {host_ids[via]}")
|
|
await manager.remove_node(servers[via].server_id, servers[self.fail_idx].server_id)
|
|
logger.info(f"Done with {self.replica} {host_ids[self.fail_idx]}")
|
|
|
|
finish_writes = await start_writes(cql, ks, "test")
|
|
|
|
failer = node_failer(fail_stage, fail_replica, ks)
|
|
await failer.setup()
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0))
|
|
await failer.wait()
|
|
await failer.stop()
|
|
|
|
logger.info("Done, waiting for migration to finish")
|
|
await migration_task
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is now on [{replicas}]")
|
|
assert len(replicas) == 1
|
|
for r in replicas[0].replicas:
|
|
assert r[0] != host_ids[failer.fail_idx]
|
|
|
|
await finish_writes()
|
|
|
|
# For dropping the keyspace after the node failure
|
|
await reconnect_driver(manager)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_back_and_forth_migration(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
host_ids = []
|
|
servers = []
|
|
|
|
async def make_server():
|
|
s = await manager.server_add(config=cfg)
|
|
servers.append(s)
|
|
host_ids.append(await manager.get_host_id(s.server_id))
|
|
|
|
async def assert_rows(num):
|
|
res = await cql.run_async(f"SELECT * FROM {ks}.test")
|
|
assert len(res) == num
|
|
|
|
await make_server()
|
|
await manager.disable_tablet_balancing()
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await make_server()
|
|
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({1}, {1});")
|
|
await assert_rows(1)
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
|
|
|
|
old_replica = replicas[0].replicas[0]
|
|
assert old_replica[0] != host_ids[1]
|
|
new_replica = (host_ids[1], 0)
|
|
|
|
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
|
|
|
|
await assert_rows(1)
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({2}, {2});")
|
|
await assert_rows(2)
|
|
|
|
logger.info(f"Moving tablet {new_replica} -> {old_replica}")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], old_replica[0], old_replica[1], 0)
|
|
|
|
await assert_rows(2)
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({3}, {3});")
|
|
await assert_rows(3)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_staging_backlog_is_preserved_with_file_based_streaming(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
# the error injection will halt view updates from staging, allowing migration to transfer the view update backlog.
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled',
|
|
'error_injections_at_startup': ['view_update_generator_consume_staging_sstable']}
|
|
servers = [await manager.server_add(config=cfg)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
logger.info("Populating single tablet")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
# check
|
|
async def check(expected):
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == len(expected)
|
|
await check(keys)
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(config=cfg))
|
|
|
|
async def get_table_dir(manager, server_id):
|
|
node_workdir = await manager.server_get_workdir(server_id)
|
|
return glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
|
|
s0_table_dir = await get_table_dir(manager, servers[0].server_id)
|
|
logger.info(f"Table dir in server 0: {s0_table_dir}")
|
|
|
|
s1_table_dir = await get_table_dir(manager, servers[1].server_id)
|
|
logger.info(f"Table dir in server 1: {s1_table_dir}")
|
|
|
|
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
|
|
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
|
|
manager.driver_close()
|
|
cql = None
|
|
await manager.server_stop_gracefully(servers[0].server_id)
|
|
|
|
def move_sstables_to_staging(table_dir: str):
|
|
table_staging_dir = os.path.join(table_dir, "staging")
|
|
logger.info(f"Moving sstables to staging dir: {table_staging_dir}")
|
|
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
|
|
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
|
|
dst_path = os.path.join(table_staging_dir, os.path.basename(src_path))
|
|
logger.info(f"Moving sstable file {src_path} to {dst_path}")
|
|
os.rename(src_path, dst_path)
|
|
|
|
def sstable_count_in_staging(table_dir: str):
|
|
table_staging_dir = os.path.join(table_dir, "staging")
|
|
return len(glob.glob(os.path.join(table_staging_dir, "*-Data.db")))
|
|
|
|
move_sstables_to_staging(s0_table_dir)
|
|
s0_sstables_in_staging = sstable_count_in_staging(s0_table_dir)
|
|
|
|
await manager.server_start(servers[0].server_id)
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
# FIXME: After https://github.com/scylladb/scylladb/issues/19149 is fixed, we can check that view updates complete
|
|
# after migration and then check for base-view consistency. By the time being, we only check that backlog is
|
|
# transferred by looking at staging directory.
|
|
|
|
s1_sstables_in_staging = sstable_count_in_staging(s1_table_dir)
|
|
logger.info(f"SSTable count in staging dir of server 1: {s1_sstables_in_staging}")
|
|
|
|
logger.info("Allowing view update generator to progress again")
|
|
await asyncio.gather(*[manager.api.disable_injection(server.ip_addr, 'view_update_generator_consume_staging_sstable') for server in servers])
|
|
|
|
assert s0_sstables_in_staging > 0
|
|
assert s0_sstables_in_staging == s1_sstables_in_staging
|
|
|
|
await check(keys)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("migration_stage_and_injection", [("cleanup", "cleanup_tablet_wait"), ("end_migration", "handle_tablet_migration_end_migration")], ids=["cleanup", "end_migration"])
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, migration_stage_and_injection):
|
|
"""
|
|
Migrate a tablet from one node to another, and while in some migration
|
|
cleanup stage, either before or after the tablet is cleaned, restart the
|
|
leaving replica. Then trigger a tablet merge.
|
|
|
|
This reproduces issue #23481: when the leaving replica is restarted in
|
|
end_migration, after the SSTables are cleaned, it starts and allocates the
|
|
state for the tablet in the storage group manager, even though it was
|
|
cleaned already. The resulting state causes the following merge process to
|
|
fail with an assert.
|
|
"""
|
|
stage, injection = migration_stage_and_injection
|
|
|
|
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
|
|
|
|
total_keys = 10
|
|
for pk in range(total_keys):
|
|
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_token = 0
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Find which server holds the tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
if replica[0] == s0_host_id:
|
|
src_server, dst_host_id = servers[0], s1_host_id
|
|
else:
|
|
src_server, dst_host_id = servers[1], s0_host_id
|
|
|
|
# Injection for waiting in cleanup stage
|
|
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, injection, one_shot=False) for s in servers])
|
|
|
|
# Start migration - move tablet to other node
|
|
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
|
|
|
|
# Wait for the tablet to reach cleanup stage
|
|
async def tablet_stage_is_cleanup():
|
|
tinfo = await get_tablet_info(manager, servers[0], ks, 'test', tablet_token)
|
|
if tinfo.stage == stage:
|
|
return True
|
|
await wait_for(tablet_stage_is_cleanup, time.time() + 60)
|
|
|
|
# Restart the leaving replica (src_server)
|
|
await manager.server_restart(src_server.server_id)
|
|
|
|
cql = await reconnect_driver(manager)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
|
|
|
|
await await_api_task(move_task, allowed_exception=HTTPError, allowed_error="abort_requested_exception")
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
# Trigger tablet merge to reproduce #23481
|
|
table_id = await manager.get_table_id(ks, 'test')
|
|
async def get_tablet_count():
|
|
rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where table_id = {table_id}")
|
|
return rows[0].tablet_count
|
|
|
|
old_tablet_count = await get_tablet_count()
|
|
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': 1}}")
|
|
|
|
async def tablets_merged():
|
|
new_tablet_count = await get_tablet_count()
|
|
if new_tablet_count < old_tablet_count:
|
|
return True
|
|
await wait_for(tablets_merged, time.time() + 60)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_restart_in_cleanup_stage_after_cleanup(manager: ManagerClient):
|
|
"""
|
|
Migrate a tablet from one node to another, and restart the leaving replica during
|
|
the tablet cleanup stage, after tablet cleanup is completed.
|
|
Reproduces issue #24857
|
|
"""
|
|
cfg = {'tablet_load_stats_refresh_interval_in_seconds': 1}
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
|
|
|
|
total_keys = 10
|
|
for pk in range(total_keys):
|
|
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_token = 0
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Find which server holds the tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
if replica[0] == s0_host_id:
|
|
src_server, dst_host_id = servers[0], s1_host_id
|
|
else:
|
|
src_server, dst_host_id = servers[1], s0_host_id
|
|
|
|
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "wait_after_tablet_cleanup", one_shot=False) for s in servers])
|
|
|
|
log = await manager.server_open_log(servers[0].server_id)
|
|
mark = await log.mark()
|
|
|
|
# Start migration - move tablet to other node
|
|
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
|
|
|
|
await log.wait_for("Waiting after tablet cleanup", from_mark=mark, timeout=60)
|
|
|
|
# Restart the leaving replica (src_server)
|
|
await manager.server_stop(src_server.server_id)
|
|
await manager.server_start(src_server.server_id)
|
|
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 30)
|
|
|
|
await asyncio.gather(*[manager.api.message_injection(s.ip_addr, "wait_after_tablet_cleanup") for s in servers])
|
|
|
|
await await_api_task(move_task, allowed_exception=ServerDisconnectedError)
|
|
|
|
await manager.api.quiesce_topology(servers[0].ip_addr)
|