Files
scylladb/test/cluster/test_tablets_migration.py
Łukasz Paszkowski 67f1c6d36c topology_coordinator: Add barrier to cleanup_target
Consider the following scenario:
1. A table has RF=3 and writes use CL=QUORUM
2. One node is down
3. There is a pending tablet migration from the unavailable node
   that is reverted

During the revert, there can be a time window where the pending replica
being cleaned up still accepts writes. This leads to write failures,
as only two nodes (out of four) are able to acknowledge writes.

This patch fixes the issue by adding a barrier to the cleanup_target
tablet transition state, ensuring that the coordinator switches back to
the previous replica set before cleanup is triggered.

Fixes https://github.com/scylladb/scylladb/issues/26512
2025-12-03 16:19:17 +01:00

544 lines
26 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from collections import defaultdict
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError, read_barrier
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_info
from test.pylib.util import start_writes
from test.cluster.conftest import skip_mode
from test.cluster.util import wait_for_cql_and_get_hosts, new_test_keyspace, reconnect_driver, wait_for
import time
import pytest
import logging
import asyncio
import os
import glob
logger = logging.getLogger(__name__)
@pytest.mark.parametrize("action", ['move', 'add_replica', 'del_replica'])
@pytest.mark.asyncio
async def test_tablet_transition_sanity(manager: ManagerClient, action):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
host_ids = []
servers = []
hosts_by_rack = defaultdict(list)
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host = await manager.get_host_id(s.server_id)
hosts_by_rack[rack].append(host)
host_ids.append(host)
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server("r1")
await make_server("r1")
await make_server("r2")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
old_replica = None
for r in replicas[0].replicas:
if r[0] in hosts_by_rack['r1']:
old_replica = r
break
for h in hosts_by_rack['r1']:
if h != old_replica[0]:
new_replica = (h, 0)
break
else:
assert False, "Cannot find node without replica in rack r1"
if action == 'move':
logger.info(f"Move tablet {old_replica[0]} -> {new_replica[0]}")
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
if action == 'add_replica':
logger.info(f"Adding replica to tablet, host {new_replica[0]}")
await manager.api.add_tablet_replica(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], 0)
if action == 'del_replica':
logger.info(f"Deleting replica from tablet, host {old_replica[0]}")
await manager.api.del_tablet_replica(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], 0)
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is now on [{replicas}]")
assert len(replicas) == 1
replicas = [ r[0] for r in replicas[0].replicas ]
if action == 'move':
assert len(replicas) == 2
assert new_replica[0] in replicas
assert old_replica[0] not in replicas
if action == 'add_replica':
assert len(replicas) == 3
assert old_replica[0] in replicas
assert new_replica[0] in replicas
if action == 'del_replica':
assert len(replicas) == 1
assert old_replica[0] not in replicas
for h, s in zip(host_ids, servers):
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
if h != host_ids[0]:
await read_barrier(manager.api, host[0].address) # host-0 did the barrier in get_all_tablet_replicas above
res = await cql.run_async(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({ks}.test)", host=host[0])
logger.info(f"Host {h} reports {res} as mutation fragments count")
if h in replicas:
assert res[0].count != 0
else:
assert res[0].count == 0
@pytest.mark.parametrize("fail_replica", ["source", "destination"])
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
if fail_stage == 'cleanup' and fail_replica == 'destination':
pytest.skip('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
pytest.skip('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
host_ids = []
servers = []
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server("r1")
await make_server("r2")
await make_server("r3")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await make_server("r2")
if fail_stage in ["cleanup_target", "revert_migration"]:
# we'll stop 2 servers, group0 quorum should be there - we need five
# nodes to have three remaining
#
# also one of the extra nodes will be used to call removenode on
# removing the 1st node will wait for the operation to go through
# raft log, and it will not finish before tablet migration. An
# attempt to remove the 2nd node, to make cleanup_target stage
# go ahead, will step on the legacy API lock on storage_service,
# so we need to ask some other node to do it
await make_server("r1")
await make_server("r2")
logger.info(f"Cluster is [{host_ids}]")
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 3
last_token = replicas[0].last_token
old_replica = None
for r in replicas[0].replicas:
assert r[0] != host_ids[3], "Tablet got migrated to node3"
if r[0] == host_ids[1]:
old_replica = r
assert old_replica is not None
new_replica = (host_ids[3], 0)
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
class node_failer:
def __init__(self, stage, replica, ks):
self.stage = stage
self.replica = replica
self.fail_idx = 1 if self.replica == "source" else 3
self.ks = ks
async def setup(self):
logger.info(f"Will fail {self.stage}")
if self.stage == "streaming":
await manager.api.enable_injection(servers[3].ip_addr, "stream_mutation_fragments", one_shot=True)
self.log = await manager.server_open_log(servers[3].server_id)
self.mark = await self.log.mark()
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "raft_topology_barrier_and_drain_fail", one_shot=False,
parameters={'keyspace': self.ks, 'table': 'test', 'last_token': last_token, 'stage': self.stage.removeprefix('do_')})
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
self.mark = await self.log.mark()
elif self.stage == "cleanup":
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "cleanup_tablet_crash", one_shot=True)
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
self.mark = await self.log.mark()
elif self.stage == "cleanup_target":
assert self.fail_idx == 3
self.stream_fail = node_failer('streaming', 'source', ks)
await self.stream_fail.setup()
self.cleanup_fail = node_failer('cleanup', 'destination', ks)
await self.cleanup_fail.setup()
elif self.stage == "revert_migration":
self.wbro_fail = node_failer('write_both_read_old', 'source' if self.replica == 'destination' else 'destination', ks)
await self.wbro_fail.setup()
self.revert_fail = node_failer('do_revert_migration', self.replica, ks)
await self.revert_fail.setup()
else:
assert False, f"Unknown stage {self.stage}"
async def wait(self):
logger.info(f"Wait for {self.stage} to happen")
if self.stage == "streaming":
await self.log.wait_for('stream_mutation_fragments: waiting', from_mark=self.mark)
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
await self.log.wait_for('raft_topology_cmd: barrier handler waits', from_mark=self.mark);
elif self.stage == "cleanup":
await self.log.wait_for('Crashing tablet cleanup', from_mark=self.mark)
elif self.stage == "cleanup_target":
await self.stream_fail.wait()
self.stream_stop_task = asyncio.create_task(self.stream_fail.stop())
await self.cleanup_fail.wait()
elif self.stage == "revert_migration":
await self.wbro_fail.wait()
self.wbro_fail_task = asyncio.create_task(self.wbro_fail.stop())
await self.revert_fail.wait()
else:
assert False
async def stop(self, via=0):
if self.stage == "cleanup_target":
await self.cleanup_fail.stop(via=4) # removenode of source is happening via node0 already
await self.stream_stop_task
return
if self.stage == "revert_migration":
await self.revert_fail.stop(via=4)
await self.wbro_fail_task
return
logger.info(f"Stop {self.replica} {host_ids[self.fail_idx]}")
await manager.server_stop(servers[self.fail_idx].server_id)
logger.info(f"Remove {self.replica} {host_ids[self.fail_idx]} via {host_ids[via]}")
await manager.remove_node(servers[via].server_id, servers[self.fail_idx].server_id)
logger.info(f"Done with {self.replica} {host_ids[self.fail_idx]}")
finish_writes = await start_writes(cql, ks, "test")
failer = node_failer(fail_stage, fail_replica, ks)
await failer.setup()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0))
await failer.wait()
await failer.stop()
logger.info("Done, waiting for migration to finish")
await migration_task
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is now on [{replicas}]")
assert len(replicas) == 1
for r in replicas[0].replicas:
assert r[0] != host_ids[failer.fail_idx]
await finish_writes()
# For dropping the keyspace after the node failure
await reconnect_driver(manager)
@pytest.mark.asyncio
async def test_tablet_back_and_forth_migration(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
host_ids = []
servers = []
async def make_server():
s = await manager.server_add(config=cfg)
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
async def assert_rows(num):
res = await cql.run_async(f"SELECT * FROM {ks}.test")
assert len(res) == num
await make_server()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await make_server()
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({1}, {1});")
await assert_rows(1)
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
old_replica = replicas[0].replicas[0]
assert old_replica[0] != host_ids[1]
new_replica = (host_ids[1], 0)
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
await assert_rows(1)
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({2}, {2});")
await assert_rows(2)
logger.info(f"Moving tablet {new_replica} -> {old_replica}")
manager.api.move_tablet(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], old_replica[0], old_replica[1], 0)
await assert_rows(2)
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({3}, {3});")
await assert_rows(3)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_staging_backlog_is_preserved_with_file_based_streaming(manager: ManagerClient):
logger.info("Bootstrapping cluster")
# the error injection will halt view updates from staging, allowing migration to transfer the view update backlog.
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled',
'error_injections_at_startup': ['view_update_generator_consume_staging_sstable']}
servers = [await manager.server_add(config=cfg)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
logger.info("Populating single tablet")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
# check
async def check(expected):
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == len(expected)
await check(keys)
logger.info("Adding new server")
servers.append(await manager.server_add(config=cfg))
async def get_table_dir(manager, server_id):
node_workdir = await manager.server_get_workdir(server_id)
return glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
s0_table_dir = await get_table_dir(manager, servers[0].server_id)
logger.info(f"Table dir in server 0: {s0_table_dir}")
s1_table_dir = await get_table_dir(manager, servers[1].server_id)
logger.info(f"Table dir in server 1: {s1_table_dir}")
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
manager.driver_close()
cql = None
await manager.server_stop_gracefully(servers[0].server_id)
def move_sstables_to_staging(table_dir: str):
table_staging_dir = os.path.join(table_dir, "staging")
logger.info(f"Moving sstables to staging dir: {table_staging_dir}")
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
dst_path = os.path.join(table_staging_dir, os.path.basename(src_path))
logger.info(f"Moving sstable file {src_path} to {dst_path}")
os.rename(src_path, dst_path)
def sstable_count_in_staging(table_dir: str):
table_staging_dir = os.path.join(table_dir, "staging")
return len(glob.glob(os.path.join(table_staging_dir, "*-Data.db")))
move_sstables_to_staging(s0_table_dir)
s0_sstables_in_staging = sstable_count_in_staging(s0_table_dir)
await manager.server_start(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
# FIXME: After https://github.com/scylladb/scylladb/issues/19149 is fixed, we can check that view updates complete
# after migration and then check for base-view consistency. By the time being, we only check that backlog is
# transferred by looking at staging directory.
s1_sstables_in_staging = sstable_count_in_staging(s1_table_dir)
logger.info(f"SSTable count in staging dir of server 1: {s1_sstables_in_staging}")
logger.info("Allowing view update generator to progress again")
for server in servers:
manager.api.disable_injection(server.ip_addr, 'view_update_generator_consume_staging_sstable')
assert s0_sstables_in_staging > 0
assert s0_sstables_in_staging == s1_sstables_in_staging
await check(keys)
@pytest.mark.asyncio
@pytest.mark.parametrize("migration_stage_and_injection", [("cleanup", "cleanup_tablet_wait"), ("end_migration", "handle_tablet_migration_end_migration")], ids=["cleanup", "end_migration"])
@skip_mode('release', 'error injections are not supported in release mode')
async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, migration_stage_and_injection):
"""
Migrate a tablet from one node to another, and while in some migration
cleanup stage, either before or after the tablet is cleaned, restart the
leaving replica. Then trigger a tablet merge.
This reproduces issue #23481: when the leaving replica is restarted in
end_migration, after the SSTables are cleaned, it starts and allocates the
state for the tablet in the storage group manager, even though it was
cleaned already. The resulting state causes the following merge process to
fail with an assert.
"""
stage, injection = migration_stage_and_injection
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = await manager.servers_add(2, config=cfg)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
total_keys = 10
for pk in range(total_keys):
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_token = 0
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Find which server holds the tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
if replica[0] == s0_host_id:
src_server, dst_host_id = servers[0], s1_host_id
else:
src_server, dst_host_id = servers[1], s0_host_id
# Injection for waiting in cleanup stage
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, injection, one_shot=False) for s in servers])
# Start migration - move tablet to other node
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
# Wait for the tablet to reach cleanup stage
async def tablet_stage_is_cleanup():
tinfo = await get_tablet_info(manager, servers[0], ks, 'test', tablet_token)
if tinfo.stage == stage:
return True
await wait_for(tablet_stage_is_cleanup, time.time() + 60)
# Restart the leaving replica (src_server)
await manager.server_restart(src_server.server_id)
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
# Trigger tablet merge to reproduce #23481
table_id = await manager.get_table_id(ks, 'test')
async def get_tablet_count():
rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where table_id = {table_id}")
return rows[0].tablet_count
old_tablet_count = await get_tablet_count()
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': 1}}")
async def tablets_merged():
new_tablet_count = await get_tablet_count()
if new_tablet_count < old_tablet_count:
return True
await wait_for(tablets_merged, time.time() + 60)
# Workaround for https://github.com/scylladb/scylladb/issues/21779. We don't want the keyspace drop at the end
# of new_test_keyspace to fail because of concurrent tablet migrations.
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_restart_in_cleanup_stage_after_cleanup(manager: ManagerClient):
"""
Migrate a tablet from one node to another, and restart the leaving replica during
the tablet cleanup stage, after tablet cleanup is completed.
Reproduces issue #24857
"""
cfg = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = await manager.servers_add(2, config=cfg)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
total_keys = 10
for pk in range(total_keys):
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_token = 0
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Find which server holds the tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
if replica[0] == s0_host_id:
src_server, dst_host_id = servers[0], s1_host_id
else:
src_server, dst_host_id = servers[1], s0_host_id
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "wait_after_tablet_cleanup", one_shot=False) for s in servers])
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
# Start migration - move tablet to other node
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
await log.wait_for("Waiting after tablet cleanup", from_mark=mark, timeout=60)
# Restart the leaving replica (src_server)
await manager.server_stop(src_server.server_id)
await manager.server_start(src_server.server_id)
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 30)
await asyncio.gather(*[manager.api.message_injection(s.ip_addr, "wait_after_tablet_cleanup") for s in servers])
await manager.api.quiesce_topology(servers[0].ip_addr)