Files
scylladb/test/cluster/test_tablets_migration.py
Benny Halevy b3fec20960 test_tablets_migration: test_staging_backlog_is_preserved_with_file_based_streaming: convert for loop to asyncio.gather
Currently the test iterates on all servers and calls manager.api.disable_injection
but it doesn't await those calls.
Use asyncio.gather to await all calls in parallel.

Co-authored-by: Copilot CLI
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2026-03-12 15:26:40 +02:00

558 lines
26 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from collections import defaultdict
from typing import Optional, Type
from aiohttp.client_exceptions import ServerDisconnectedError
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError, read_barrier
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_info
from test.pylib.util import start_writes
from test.cluster.util import wait_for_cql_and_get_hosts, new_test_keyspace, reconnect_driver, wait_for
import time
import pytest
import logging
import asyncio
import os
import glob
import re
logger = logging.getLogger(__name__)
async def await_api_task(task, allowed_exception: Optional[Type[Exception]]=None, allowed_error: Optional[str]=None):
try:
await task
except allowed_exception as e:
if allowed_error and not re.search(allowed_error, str(e)):
raise
@pytest.mark.parametrize("action", ['move', 'add_replica', 'del_replica'])
@pytest.mark.asyncio
async def test_tablet_transition_sanity(manager: ManagerClient, action):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
host_ids = []
servers = []
hosts_by_rack = defaultdict(list)
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host = await manager.get_host_id(s.server_id)
hosts_by_rack[rack].append(host)
host_ids.append(host)
await make_server("r1")
await make_server("r1")
await make_server("r2")
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
old_replica = None
for r in replicas[0].replicas:
if r[0] in hosts_by_rack['r1']:
old_replica = r
break
for h in hosts_by_rack['r1']:
if h != old_replica[0]:
new_replica = (h, 0)
break
else:
assert False, "Cannot find node without replica in rack r1"
if action == 'move':
logger.info(f"Move tablet {old_replica[0]} -> {new_replica[0]}")
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
if action == 'add_replica':
logger.info(f"Adding replica to tablet, host {new_replica[0]}")
await manager.api.add_tablet_replica(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], 0)
if action == 'del_replica':
logger.info(f"Deleting replica from tablet, host {old_replica[0]}")
await manager.api.del_tablet_replica(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], 0)
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is now on [{replicas}]")
assert len(replicas) == 1
replicas = [ r[0] for r in replicas[0].replicas ]
if action == 'move':
assert len(replicas) == 2
assert new_replica[0] in replicas
assert old_replica[0] not in replicas
if action == 'add_replica':
assert len(replicas) == 3
assert old_replica[0] in replicas
assert new_replica[0] in replicas
if action == 'del_replica':
assert len(replicas) == 1
assert old_replica[0] not in replicas
for h, s in zip(host_ids, servers):
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
if h != host_ids[0]:
await read_barrier(manager.api, host[0].address) # host-0 did the barrier in get_all_tablet_replicas above
res = await cql.run_async(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({ks}.test)", host=host[0])
logger.info(f"Host {h} reports {res} as mutation fragments count")
if h in replicas:
assert res[0].count != 0
else:
assert res[0].count == 0
@pytest.mark.parametrize("fail_replica", ["source", "destination"])
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
if fail_stage == 'cleanup' and fail_replica == 'destination':
pytest.skip('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
pytest.skip('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
host_ids = []
servers = []
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await make_server("r1")
await make_server("r2")
await make_server("r3")
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await make_server("r2")
if fail_stage in ["cleanup_target", "revert_migration"]:
# we'll stop 2 servers, group0 quorum should be there - we need five
# nodes to have three remaining
#
# also one of the extra nodes will be used to call removenode on
# removing the 1st node will wait for the operation to go through
# raft log, and it will not finish before tablet migration. An
# attempt to remove the 2nd node, to make cleanup_target stage
# go ahead, will step on the legacy API lock on storage_service,
# so we need to ask some other node to do it
await make_server("r1")
await make_server("r2")
logger.info(f"Cluster is [{host_ids}]")
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 3
last_token = replicas[0].last_token
old_replica = None
for r in replicas[0].replicas:
assert r[0] != host_ids[3], "Tablet got migrated to node3"
if r[0] == host_ids[1]:
old_replica = r
assert old_replica is not None
new_replica = (host_ids[3], 0)
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
class node_failer:
def __init__(self, stage, replica, ks):
self.stage = stage
self.replica = replica
self.fail_idx = 1 if self.replica == "source" else 3
self.ks = ks
async def setup(self):
logger.info(f"Will fail {self.stage}")
if self.stage == "streaming":
await manager.api.enable_injection(servers[3].ip_addr, "stream_mutation_fragments", one_shot=True)
self.log = await manager.server_open_log(servers[3].server_id)
self.mark = await self.log.mark()
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "raft_topology_barrier_and_drain_fail", one_shot=False,
parameters={'keyspace': self.ks, 'table': 'test', 'last_token': last_token, 'stage': self.stage.removeprefix('do_')})
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
self.mark = await self.log.mark()
elif self.stage == "cleanup":
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "cleanup_tablet_crash", one_shot=True)
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
self.mark = await self.log.mark()
elif self.stage == "cleanup_target":
assert self.fail_idx == 3
self.stream_fail = node_failer('streaming', 'source', ks)
await self.stream_fail.setup()
self.cleanup_fail = node_failer('cleanup', 'destination', ks)
await self.cleanup_fail.setup()
elif self.stage == "revert_migration":
self.wbro_fail = node_failer('write_both_read_old', 'source' if self.replica == 'destination' else 'destination', ks)
await self.wbro_fail.setup()
self.revert_fail = node_failer('do_revert_migration', self.replica, ks)
await self.revert_fail.setup()
else:
assert False, f"Unknown stage {self.stage}"
async def wait(self):
logger.info(f"Wait for {self.stage} to happen")
if self.stage == "streaming":
await self.log.wait_for('stream_mutation_fragments: waiting', from_mark=self.mark)
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
await self.log.wait_for('raft_topology_cmd: barrier handler waits', from_mark=self.mark);
elif self.stage == "cleanup":
await self.log.wait_for('Crashing tablet cleanup', from_mark=self.mark)
elif self.stage == "cleanup_target":
await self.stream_fail.wait()
self.stream_stop_task = asyncio.create_task(self.stream_fail.stop())
await self.cleanup_fail.wait()
elif self.stage == "revert_migration":
await self.wbro_fail.wait()
self.wbro_fail_task = asyncio.create_task(self.wbro_fail.stop())
await self.revert_fail.wait()
else:
assert False
async def stop(self, via=0):
if self.stage == "cleanup_target":
await self.cleanup_fail.stop(via=4) # removenode of source is happening via node0 already
await self.stream_stop_task
return
if self.stage == "revert_migration":
await self.revert_fail.stop(via=4)
await self.wbro_fail_task
return
logger.info(f"Stop {self.replica} {host_ids[self.fail_idx]}")
await manager.server_stop(servers[self.fail_idx].server_id)
logger.info(f"Remove {self.replica} {host_ids[self.fail_idx]} via {host_ids[via]}")
await manager.remove_node(servers[via].server_id, servers[self.fail_idx].server_id)
logger.info(f"Done with {self.replica} {host_ids[self.fail_idx]}")
finish_writes = await start_writes(cql, ks, "test")
failer = node_failer(fail_stage, fail_replica, ks)
await failer.setup()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0))
await failer.wait()
await failer.stop()
logger.info("Done, waiting for migration to finish")
await migration_task
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is now on [{replicas}]")
assert len(replicas) == 1
for r in replicas[0].replicas:
assert r[0] != host_ids[failer.fail_idx]
await finish_writes()
# For dropping the keyspace after the node failure
await reconnect_driver(manager)
@pytest.mark.asyncio
async def test_tablet_back_and_forth_migration(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
host_ids = []
servers = []
async def make_server():
s = await manager.server_add(config=cfg)
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
async def assert_rows(num):
res = await cql.run_async(f"SELECT * FROM {ks}.test")
assert len(res) == num
await make_server()
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await make_server()
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({1}, {1});")
await assert_rows(1)
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
old_replica = replicas[0].replicas[0]
assert old_replica[0] != host_ids[1]
new_replica = (host_ids[1], 0)
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
await assert_rows(1)
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({2}, {2});")
await assert_rows(2)
logger.info(f"Moving tablet {new_replica} -> {old_replica}")
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], old_replica[0], old_replica[1], 0)
await assert_rows(2)
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({3}, {3});")
await assert_rows(3)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_staging_backlog_is_preserved_with_file_based_streaming(manager: ManagerClient):
logger.info("Bootstrapping cluster")
# the error injection will halt view updates from staging, allowing migration to transfer the view update backlog.
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled',
'error_injections_at_startup': ['view_update_generator_consume_staging_sstable']}
servers = [await manager.server_add(config=cfg)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
logger.info("Populating single tablet")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
# check
async def check(expected):
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == len(expected)
await check(keys)
logger.info("Adding new server")
servers.append(await manager.server_add(config=cfg))
async def get_table_dir(manager, server_id):
node_workdir = await manager.server_get_workdir(server_id)
return glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
s0_table_dir = await get_table_dir(manager, servers[0].server_id)
logger.info(f"Table dir in server 0: {s0_table_dir}")
s1_table_dir = await get_table_dir(manager, servers[1].server_id)
logger.info(f"Table dir in server 1: {s1_table_dir}")
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
manager.driver_close()
cql = None
await manager.server_stop_gracefully(servers[0].server_id)
def move_sstables_to_staging(table_dir: str):
table_staging_dir = os.path.join(table_dir, "staging")
logger.info(f"Moving sstables to staging dir: {table_staging_dir}")
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
dst_path = os.path.join(table_staging_dir, os.path.basename(src_path))
logger.info(f"Moving sstable file {src_path} to {dst_path}")
os.rename(src_path, dst_path)
def sstable_count_in_staging(table_dir: str):
table_staging_dir = os.path.join(table_dir, "staging")
return len(glob.glob(os.path.join(table_staging_dir, "*-Data.db")))
move_sstables_to_staging(s0_table_dir)
s0_sstables_in_staging = sstable_count_in_staging(s0_table_dir)
await manager.server_start(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
# FIXME: After https://github.com/scylladb/scylladb/issues/19149 is fixed, we can check that view updates complete
# after migration and then check for base-view consistency. By the time being, we only check that backlog is
# transferred by looking at staging directory.
s1_sstables_in_staging = sstable_count_in_staging(s1_table_dir)
logger.info(f"SSTable count in staging dir of server 1: {s1_sstables_in_staging}")
logger.info("Allowing view update generator to progress again")
await asyncio.gather(*[manager.api.disable_injection(server.ip_addr, 'view_update_generator_consume_staging_sstable') for server in servers])
assert s0_sstables_in_staging > 0
assert s0_sstables_in_staging == s1_sstables_in_staging
await check(keys)
@pytest.mark.asyncio
@pytest.mark.parametrize("migration_stage_and_injection", [("cleanup", "cleanup_tablet_wait"), ("end_migration", "handle_tablet_migration_end_migration")], ids=["cleanup", "end_migration"])
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, migration_stage_and_injection):
"""
Migrate a tablet from one node to another, and while in some migration
cleanup stage, either before or after the tablet is cleaned, restart the
leaving replica. Then trigger a tablet merge.
This reproduces issue #23481: when the leaving replica is restarted in
end_migration, after the SSTables are cleaned, it starts and allocates the
state for the tablet in the storage group manager, even though it was
cleaned already. The resulting state causes the following merge process to
fail with an assert.
"""
stage, injection = migration_stage_and_injection
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = await manager.servers_add(2, config=cfg)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
total_keys = 10
for pk in range(total_keys):
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_token = 0
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Find which server holds the tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
if replica[0] == s0_host_id:
src_server, dst_host_id = servers[0], s1_host_id
else:
src_server, dst_host_id = servers[1], s0_host_id
# Injection for waiting in cleanup stage
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, injection, one_shot=False) for s in servers])
# Start migration - move tablet to other node
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
# Wait for the tablet to reach cleanup stage
async def tablet_stage_is_cleanup():
tinfo = await get_tablet_info(manager, servers[0], ks, 'test', tablet_token)
if tinfo.stage == stage:
return True
await wait_for(tablet_stage_is_cleanup, time.time() + 60)
# Restart the leaving replica (src_server)
await manager.server_restart(src_server.server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
await await_api_task(move_task, allowed_exception=HTTPError, allowed_error="abort_requested_exception")
await manager.enable_tablet_balancing()
# Trigger tablet merge to reproduce #23481
table_id = await manager.get_table_id(ks, 'test')
async def get_tablet_count():
rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where table_id = {table_id}")
return rows[0].tablet_count
old_tablet_count = await get_tablet_count()
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': 1}}")
async def tablets_merged():
new_tablet_count = await get_tablet_count()
if new_tablet_count < old_tablet_count:
return True
await wait_for(tablets_merged, time.time() + 60)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_restart_in_cleanup_stage_after_cleanup(manager: ManagerClient):
"""
Migrate a tablet from one node to another, and restart the leaving replica during
the tablet cleanup stage, after tablet cleanup is completed.
Reproduces issue #24857
"""
cfg = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = await manager.servers_add(2, config=cfg)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 8}};")
total_keys = 10
for pk in range(total_keys):
await cql.run_async(f"INSERT INTO {ks}.test(pk, c) VALUES({pk}, {pk+1})")
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_token = 0
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Find which server holds the tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
if replica[0] == s0_host_id:
src_server, dst_host_id = servers[0], s1_host_id
else:
src_server, dst_host_id = servers[1], s0_host_id
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "wait_after_tablet_cleanup", one_shot=False) for s in servers])
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
# Start migration - move tablet to other node
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
await log.wait_for("Waiting after tablet cleanup", from_mark=mark, timeout=60)
# Restart the leaving replica (src_server)
await manager.server_stop(src_server.server_id)
await manager.server_start(src_server.server_id)
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 30)
await asyncio.gather(*[manager.api.message_injection(s.ip_addr, "wait_after_tablet_cleanup") for s in servers])
await await_api_task(move_task, allowed_exception=ServerDisconnectedError)
await manager.api.quiesce_topology(servers[0].ip_addr)