# # Copyright (C) 2025-present ScyllaDB # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 # from test.pylib.internal_types import ServerInfo from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error_one_shot, read_barrier from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count from test.pylib.util import wait_for from test.cluster.util import new_test_keyspace, create_new_test_keyspace import pytest import asyncio import logging import time import random logger = logging.getLogger(__name__) async def inject_error_one_shot_on(manager, error_name, servers): errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers] await asyncio.gather(*errs) async def inject_error_on(manager, error_name, servers): errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] await asyncio.gather(*errs) async def disable_injection_on(manager, error_name, servers): errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers] await asyncio.gather(*errs) @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_tablet_merge_simple(manager: ManagerClient): logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'storage_service=debug', '--logger-log-level', 'table=debug', '--logger-log-level', 'load_balancer=debug', '--target-tablet-size-in-bytes', '30000', ] servers = [await manager.server_add(config={ 'tablet_load_stats_refresh_interval_in_seconds': 1 }, cmdline=cmdline)] await manager.disable_tablet_balancing() cql = manager.get_cql() async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") # Initial average table size of 400k (1 tablet), so triggers some splits. total_keys = 200 keys = range(total_keys) def populate(keys): insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)") for pk in keys: value = random.randbytes(2000) cql.execute(insert, [pk, value]) populate(keys) async def check(): logger.info("Checking table") cql = manager.get_cql() rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;") assert len(rows) == len(keys) await check() await manager.api.flush_keyspace(servers[0].ip_addr, ks) tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') assert tablet_count == 1 logger.info("Adding new server") servers.append(await manager.server_add(cmdline=cmdline)) s1_host_id = await manager.get_host_id(servers[1].server_id) # Increases the chance of tablet migration concurrent with split await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers) await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) s1_log = await manager.server_open_log(servers[0].server_id) s1_mark = await s1_log.mark() # Now there's a split and migration need, so they'll potentially run concurrently. await manager.enable_tablet_balancing() await check() time.sleep(2) # Give load balancer some time to do work await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) await check() tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') assert tablet_count > 1 # Allow shuffling of tablet replicas to make co-location work harder async def shuffle(): await inject_error_on(manager, "tablet_allocator_shuffle", servers) time.sleep(2) await disable_injection_on(manager, "tablet_allocator_shuffle", servers) await shuffle() # This will allow us to simulate some balancing after co-location with shuffling, to make sure that # balancer won't break co-location. await inject_error_on(manager, "tablet_merge_completion_bypass", servers) # Shrinks table significantly, forcing merge. delete_keys = range(total_keys - 1) await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys]) keys = range(total_keys - 1, total_keys) # To avoid race of major with migration await manager.disable_tablet_balancing() for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) await manager.api.keyspace_compaction(server.ip_addr, ks) await manager.enable_tablet_balancing() await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark) # Waits for balancer to co-locate sibling tablets await s1_log.wait_for("All sibling tablets are co-located") # Do some shuffling to make sure balancer works with co-located tablets await shuffle() old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') s1_mark = await s1_log.mark() await inject_error_on(manager, "replica_merge_completion_wait", servers) await disable_injection_on(manager, "tablet_merge_completion_bypass", servers) await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') assert tablet_count < old_tablet_count await check() # Reproduces https://github.com/scylladb/scylladb/issues/21867 that could cause compaction group # to be destroyed without being stopped first. # That's done by: # 1) Migrating a tablet to another node, and putting an artificial delay in cleanup stage when stopping groups # 2) Force tablet split, causing new groups to be added in a tablet being cleaned up # Without the fix, new groups are added to tablet being migrated away and never closed, potentially # resulting in an use-after-free. keys = range(total_keys) populate(keys) # Migrates a tablet to another node and put artificial delay on cleanup stage await manager.api.enable_injection(servers[0].ip_addr, "delay_tablet_compaction_groups_cleanup", one_shot=True) tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test') assert len(tablet_replicas) > 0 t = tablet_replicas[0] migration_task = asyncio.create_task( manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)) # Trigger split for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) try: await migration_task except: # move_tablet() fails if tablet is already in transit. # forgive if balancer decided to migrate the target tablet post split. pass await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) await manager.api.keyspace_compaction(server.ip_addr, ks) await check() # Multiple cycles of split and merge, with topology changes in parallel and RF > 1. @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager: ManagerClient): logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'storage_service=info', '--logger-log-level', 'table=info', '--logger-log-level', 'raft_topology=info', '--logger-log-level', 'group0_raft_sm=info', '--logger-log-level', 'load_balancer=info', '--target-tablet-size-in-bytes', '30000', ] config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 } servers = [await manager.server_add(config=config, cmdline=cmdline), await manager.server_add(config=config, cmdline=cmdline), await manager.server_add(config=config, cmdline=cmdline)] cql = manager.get_cql() async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") async def perform_topology_ops(): logger.info("Topology ops in background") server_id_to_decommission = servers[-1].server_id logger.info("Decommissioning old server with id {}".format(server_id_to_decommission)) await manager.decommission_node(server_id_to_decommission) servers.pop() logger.info("Adding new server") servers.append(await manager.server_add(cmdline=cmdline)) logger.info("Completed topology ops") for cycle in range(2): logger.info("Running split-merge cycle #{}".format(cycle)) await manager.disable_tablet_balancing() logger.info("Inserting data") # Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits. total_keys = 200 keys = range(total_keys) insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)") for pk in keys: value = random.randbytes(2000) cql.execute(insert, [pk, value]) async def check(): logger.info("Checking table") cql = manager.get_cql() rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;") assert len(rows) == len(keys) await check() logger.info("Flushing keyspace") for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') # Increases the chance of tablet migration concurrent with split await inject_error_on(manager, "tablet_allocator_shuffle", servers) await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) s1_log = await manager.server_open_log(servers[0].server_id) s1_mark = await s1_log.mark() logger.info("Enabling balancing") # Now there's a split and migration need, so they'll potentially run concurrently. await manager.enable_tablet_balancing() topology_ops_task = asyncio.create_task(perform_topology_ops()) await check() logger.info("Waiting for split") await disable_injection_on(manager, "tablet_allocator_shuffle", servers) await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) logger.info("Waiting for topology ops") await topology_ops_task await check() old_tablet_count = tablet_count tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') assert tablet_count > old_tablet_count logger.info("Split increased number of tablets from {} to {}".format(old_tablet_count, tablet_count)) # Allow shuffling of tablet replicas to make co-location work harder await inject_error_on(manager, "tablet_allocator_shuffle", servers) # This will allow us to simulate some balancing after co-location with shuffling, to make sure that # balancer won't break co-location. await inject_error_on(manager, "tablet_merge_completion_bypass", servers) logger.info("Deleting data") # Delete almost all keys, enough to trigger a few merges. delete_keys = range(total_keys - 1) await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys]) keys = range(total_keys - 1, total_keys) await disable_injection_on(manager, "tablet_allocator_shuffle", servers) # To avoid race of major with migration await manager.disable_tablet_balancing() logger.info("Flushing keyspace and performing major") for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) await manager.api.keyspace_compaction(server.ip_addr, ks) await manager.enable_tablet_balancing() logger.info("Waiting for merge decision") await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark) # Waits for balancer to co-locate sibling tablets await s1_log.wait_for("All sibling tablets are co-located") # Do some shuffling to make sure balancer works with co-located tablets await inject_error_on(manager, "tablet_allocator_shuffle", servers) old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') topology_ops_task = asyncio.create_task(perform_topology_ops()) await inject_error_on(manager, "replica_merge_completion_wait", servers) await disable_injection_on(manager, "tablet_merge_completion_bypass", servers) await disable_injection_on(manager, "tablet_allocator_shuffle", servers) await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) logger.info("Waiting for topology ops") await topology_ops_task tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') assert tablet_count < old_tablet_count logger.info("Merge decreased number of tablets from {} to {}".format(old_tablet_count, tablet_count)) await check() logger.info("Flushing keyspace and performing major") for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) await manager.api.keyspace_compaction(server.ip_addr, ks) await check() @pytest.mark.parametrize("racks", [2, 3]) @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks): cmdline = ['--target-tablet-size-in-bytes', '30000',] config = {'tablet_load_stats_refresh_interval_in_seconds': 1} servers = [] rf = racks for rack_id in range(0, racks): rack = f'rack{rack_id+1}' servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack})) cql = manager.get_cql() ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};") await inject_error_on(manager, "forbid_cross_rack_migration_attempt", servers) total_keys = 400 keys = range(total_keys) insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)") for pk in keys: value = random.randbytes(2000) cql.execute(insert, [pk, value]) for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) async def finished_splitting(): # FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits. # (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled. # Per-table hints (min_tablet_count) can be used to improve this. tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') return tablet_count >= 16 or None # Give enough time for split to happen in debug mode await wait_for(finished_splitting, time.time() + 120) delete_keys = range(total_keys - 1) await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys]) keys = range(total_keys - 1, total_keys) old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) await manager.api.keyspace_compaction(server.ip_addr, ks) async def finished_merging(): tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') return tablet_count < old_tablet_count or None await wait_for(finished_merging, time.time() + 120) # Reproduces #23284 @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_tablet_split_merge_with_many_tables(build_mode: str, manager: ManagerClient, racks = 2): cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',] config = {'tablet_load_stats_refresh_interval_in_seconds': 1} servers = [] rf = racks for rack_id in range(0, racks): rack = f'rack{rack_id+1}' servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack})) cql = manager.get_cql() ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};") num_tables = 200 if build_mode != 'debug' else 20 await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, num_tables)]) async def check_logs(when): for server in servers: log = await manager.server_open_log(server.server_id) matches = await log.grep("Too long queue accumulated for gossip") if matches: pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}") await check_logs("after creating tables") total_keys = 400 keys = range(total_keys) insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)") for pk in keys: value = random.randbytes(2000) cql.execute(insert, [pk, value]) for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) async def finished_splitting(): # FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits. # (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled. # Per-table hints (min_tablet_count) can be used to improve this. tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') return tablet_count >= 16 or None # Give enough time for split to happen in debug mode await wait_for(finished_splitting, time.time() + 120) await check_logs("after split completion") delete_keys = range(total_keys - 1) await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys]) keys = range(total_keys - 1, total_keys) old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') for server in servers: await manager.api.flush_keyspace(server.ip_addr, ks) await manager.api.keyspace_compaction(server.ip_addr, ks) async def finished_merging(): tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') return tablet_count < old_tablet_count or None await wait_for(finished_merging, time.time() + 120) await check_logs("after merge completion") @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_missing_data(manager: ManagerClient): # This is a test and reproducer for issue: # https://github.com/scylladb/scylladb/issues/23313 logger.info('Bootstrapping cluster') cfg = { 'enable_tablets': True, 'tablet_load_stats_refresh_interval_in_seconds': 1 } cmdline = [ '--logger-log-level', 'load_balancer=debug', '--logger-log-level', 'debug_error_injection=debug', ] server = await manager.server_add(cmdline=cmdline, config=cfg) logger.info(f'server_id = {server.server_id}') cql = manager.get_cql() await manager.disable_tablet_balancing() inital_tablets = 32 async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {inital_tablets}}}") as ks: await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);') await manager.api.disable_autocompaction(server.ip_addr, ks, 'test') # insert data pks = range(inital_tablets) await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks]) # flush the table await manager.api.flush_keyspace(server.ip_addr, ks) # force merge on the test table expected_tablet_count = inital_tablets // 2 await cql.run_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': {expected_tablet_count}}}") await manager.enable_tablet_balancing() # wait for merge to complete actual_tablet_count = 0 started = time.time() while expected_tablet_count != actual_tablet_count: actual_tablet_count = await get_tablet_count(manager, server, ks, 'test') logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}') assert time.time() - started < 60, 'Timeout while waiting for tablet merge' await asyncio.sleep(.1) logger.info(f'Merged test table; new number of tablets: {expected_tablet_count}') # assert that the number of records has not changed qry = f'SELECT * FROM {ks}.test' logger.info(f'Running: {qry}') res = cql.execute(qry) missing = set(pks) rec_count = 0 for row in res: rec_count += 1 missing.discard(row.pk) assert rec_count == len(pks), f"received {rec_count} records instead of {len(pks)} while querying server {server.server_id}; missing keys: {missing}" @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_merge_with_drop(manager: ManagerClient): # This is a test and reproducer for issue: # https://github.com/scylladb/scylladb/issues/23313 logger.info('Bootstrapping cluster') cfg = { 'enable_tablets': True, 'tablet_load_stats_refresh_interval_in_seconds': 1 } cmdline = [ '--logger-log-level', 'load_balancer=debug', '--logger-log-level', 'debug_error_injection=debug', ] server = await manager.server_add(cmdline=cmdline, config=cfg) logger.info(f'server_id = {server.server_id}') cql = manager.get_cql() await manager.disable_tablet_balancing() initial_tablets = 32 async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};") await manager.api.disable_autocompaction(server.ip_addr, ks, 'test') # insert data pks = range(initial_tablets) await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks]) # flush the table await manager.api.flush_keyspace(server.ip_addr, ks) await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True) # force merge on the test table expected_tablet_count = initial_tablets // 2 await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}") s0_log = await manager.server_open_log(server.server_id) s0_mark = await s0_log.mark() await manager.enable_tablet_balancing() # wait for merge to complete actual_tablet_count = 0 started = time.time() while expected_tablet_count != actual_tablet_count: actual_tablet_count = await get_tablet_count(manager, server, ks, 'test') logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}') assert time.time() - started < 120, 'Timeout while waiting for tablet merge' await asyncio.sleep(.1) await s0_log.wait_for('merge_completion_fiber: waiting', from_mark=s0_mark) await manager.api.enable_injection(server.ip_addr, "compaction_group_stop_wait", one_shot=True) await manager.api.message_injection(server.ip_addr, "merge_completion_fiber") await s0_log.wait_for('compaction_group_stop_wait: waiting', from_mark=s0_mark) drop_table_fut = cql.run_async(f"drop table {ks}.test") await asyncio.sleep(0.1) await manager.api.message_injection(server.ip_addr, "compaction_group_stop_wait") await drop_table_fut @pytest.mark.asyncio @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_background_merge_deadlock(manager: ManagerClient): """ Reproducer for https://scylladb.atlassian.net/browse/SCYLLADB-928 Reproduces a deadlock in the background merge completion handler that can happen when multiple merges accumulate. If we accumulate more than 1 merge cycle for the fiber, deadlock occurs due to compaction lock taken on the main group (post-merge). The lock is held until compaction groups are precessed by the background merge fiber Example: Initial state: cg0: main, cg1: main cg2: main cg3: main After 1st merge: cg0': main [locked], merging_groups=[cg0.main, cg1.main] cg1': main [locked], merging_groups=[cg2.main, cg3.main] After 2nd merge: cg0'': main [locked], merging_groups=[cg0'.main [locked], cg0.main, cg1.main, cg1'.main [locked], cg2.main, cg3.main] The test reproduces this by doing a tablet merge from 8 tablets to 1 (8 -> 4 -> 2 -> 1). The background merge fiber is blocked until after the first merge (to 4), so that there is a higher chance of two merges queueing in the fiber. If deadlock occurs, node shutdown will hang waiting for the background merge fiber. That's why the test tries to stop the node at the end. """ cmdline = [ '--logger-log-level', 'load_balancer=debug', '--logger-log-level', 'raft_topology=debug', ] servers = [await manager.server_add(cmdline=cmdline)] cql, _ = await manager.get_ready_cql(servers) ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") # Create a table which will go through 3 merge cycles. await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) with tablets = {{'min_tablet_count': 8}};") await manager.api.enable_injection(servers[0].ip_addr, "merge_completion_fiber", one_shot=True) log = await manager.server_open_log(servers[0].server_id) mark = await log.mark() # Trigger tablet merging await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': 1}};") async def produced_one_merge(): tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') return tablet_count == 4 or None await wait_for(produced_one_merge, time.time() + 120) mark, _ = await log.wait_for(f"merge_completion_fiber: waiting", from_mark=mark) await manager.api.message_injection(servers[0].ip_addr, "merge_completion_fiber") mark, _ = await log.wait_for(f"merge_completion_fiber: message received", from_mark=mark) async def finished_merge(): tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') return tablet_count == 1 or None await wait_for(finished_merge, time.time() + 120) await manager.server_stop(servers[0].server_id)