Files
scylladb/test/cluster/test_tablets_merge.py
Benny Halevy b81c6a339b test_tablets_merge: test_tablet_split_merge_with_many_tables: reduce number of tables in debug mode
As the test hits timeouts in debug mode on aarch64.

Fixes #26252

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#26303
2025-09-29 15:30:13 +03:00

652 lines
29 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, read_barrier
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count
from test.pylib.util import wait_for
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace, create_new_test_keyspace
import pytest
import asyncio
import logging
import time
import random
logger = logging.getLogger(__name__)
async def inject_error_one_shot_on(manager, error_name, servers):
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_merge_simple(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--logger-log-level', 'load_balancer=debug',
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
# Initial average table size of 400k (1 tablet), so triggers some splits.
total_keys = 200
keys = range(total_keys)
def populate(keys):
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
populate(keys)
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == len(keys)
await check()
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 1
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Increases the chance of tablet migration concurrent with split
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await check()
time.sleep(2) # Give load balancer some time to do work
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await check()
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > 1
# Allow shuffling of tablet replicas to make co-location work harder
async def shuffle():
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
time.sleep(2)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await shuffle()
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
# balancer won't break co-location.
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
# Shrinks table significantly, forcing merge.
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
# To avoid race of major with migration
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
await s1_log.wait_for("All sibling tablets are co-located")
# Do some shuffling to make sure balancer works with co-located tablets
await shuffle()
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
s1_mark = await s1_log.mark()
await inject_error_on(manager, "replica_merge_completion_wait", servers)
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count < old_tablet_count
await check()
# Reproduces https://github.com/scylladb/scylladb/issues/21867 that could cause compaction group
# to be destroyed without being stopped first.
# That's done by:
# 1) Migrating a tablet to another node, and putting an artificial delay in cleanup stage when stopping groups
# 2) Force tablet split, causing new groups to be added in a tablet being cleaned up
# Without the fix, new groups are added to tablet being migrated away and never closed, potentially
# resulting in an use-after-free.
keys = range(total_keys)
populate(keys)
# Migrates a tablet to another node and put artificial delay on cleanup stage
await manager.api.enable_injection(servers[0].ip_addr, "delay_tablet_compaction_groups_cleanup", one_shot=True)
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablet_replicas) > 0
t = tablet_replicas[0]
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token))
# Trigger split
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
try:
await migration_task
except:
# move_tablet() fails if tablet is already in transit.
# forgive if balancer decided to migrate the target tablet post split.
pass
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await check()
# Multiple cycles of split and merge, with topology changes in parallel and RF > 1.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=info',
'--logger-log-level', 'table=info',
'--logger-log-level', 'raft_topology=info',
'--logger-log-level', 'group0_raft_sm=info',
'--logger-log-level', 'load_balancer=info',
'--target-tablet-size-in-bytes', '30000',
]
config = {
'tablet_load_stats_refresh_interval_in_seconds': 1
}
servers = [await manager.server_add(config=config, cmdline=cmdline),
await manager.server_add(config=config, cmdline=cmdline),
await manager.server_add(config=config, cmdline=cmdline)]
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
async def perform_topology_ops():
logger.info("Topology ops in background")
server_id_to_decommission = servers[-1].server_id
logger.info("Decommissioning old server with id {}".format(server_id_to_decommission))
await manager.decommission_node(server_id_to_decommission)
servers.pop()
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
logger.info("Completed topology ops")
for cycle in range(2):
logger.info("Running split-merge cycle #{}".format(cycle))
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Inserting data")
# Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits.
total_keys = 200
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == len(keys)
await check()
logger.info("Flushing keyspace")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
# Increases the chance of tablet migration concurrent with split
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
logger.info("Enabling balancing")
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
topology_ops_task = asyncio.create_task(perform_topology_ops())
await check()
logger.info("Waiting for split")
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
logger.info("Waiting for topology ops")
await topology_ops_task
await check()
old_tablet_count = tablet_count
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > old_tablet_count
logger.info("Split increased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
# Allow shuffling of tablet replicas to make co-location work harder
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
# balancer won't break co-location.
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
logger.info("Deleting data")
# Delete almost all keys, enough to trigger a few merges.
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
# To avoid race of major with migration
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Flushing keyspace and performing major")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Waiting for merge decision")
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
await s1_log.wait_for("All sibling tablets are co-located")
# Do some shuffling to make sure balancer works with co-located tablets
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
topology_ops_task = asyncio.create_task(perform_topology_ops())
await inject_error_on(manager, "replica_merge_completion_wait", servers)
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
logger.info("Waiting for topology ops")
await topology_ops_task
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count < old_tablet_count
logger.info("Merge decreased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
await check()
logger.info("Flushing keyspace and performing major")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await check()
@pytest.mark.parametrize("racks", [2, 3])
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks):
cmdline = ['--target-tablet-size-in-bytes', '30000',]
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = []
rf = racks
for rack_id in range(0, racks):
rack = f'rack{rack_id+1}'
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
await inject_error_on(manager, "forbid_cross_rack_migration_attempt", servers)
total_keys = 400
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
async def finished_splitting():
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
# Per-table hints (min_tablet_count) can be used to improve this.
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= 16 or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
async def finished_merging():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
# Reproduces #23284
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_merge_with_many_tables(build_mode: str, manager: ManagerClient, racks = 2):
cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',]
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = []
rf = racks
for rack_id in range(0, racks):
rack = f'rack{rack_id+1}'
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
num_tables = 200 if build_mode != 'debug' else 20
await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, num_tables)])
async def check_logs(when):
for server in servers:
log = await manager.server_open_log(server.server_id)
matches = await log.grep("Too long queue accumulated for gossip")
if matches:
pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}")
await check_logs("after creating tables")
total_keys = 400
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
async def finished_splitting():
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
# Per-table hints (min_tablet_count) can be used to improve this.
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= 16 or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
await check_logs("after split completion")
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
async def finished_merging():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
await check_logs("after merge completion")
# Reproduces use-after-free when migration right after merge, but concurrently to background
# merge completion handler.
# See: https://github.com/scylladb/scylladb/issues/24045
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_migration_running_concurrently_to_merge_completion_handling(manager: ManagerClient):
cmdline = []
cfg = {}
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 2
old_tablet_count = tablet_count
keys = range(100)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await cql.run_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 1}};")
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, "merge_completion_fiber", one_shot=True)
await manager.api.enable_injection(servers[0].ip_addr, "replica_merge_completion_wait", one_shot=True)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
async def finished_merging():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.enable_injection(servers[0].ip_addr, "take_storage_snapshot", one_shot=True)
await s0_log.wait_for(f"merge_completion_fiber: waiting", from_mark=s0_mark)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 1
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
src_shard = replica[1]
dst_shard = src_shard
migration = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], src_shard, s1_host_id, dst_shard, tablet_token))
await s0_log.wait_for(f"take_storage_snapshot: waiting", from_mark=s0_mark)
await manager.api.message_injection(servers[0].ip_addr, "merge_completion_fiber")
await s0_log.wait_for(f"Merge completion fiber finished", from_mark=s0_mark)
await manager.api.message_injection(servers[0].ip_addr, "take_storage_snapshot")
await migration
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_missing_data(manager: ManagerClient):
# This is a test and reproducer for issue:
# https://github.com/scylladb/scylladb/issues/23313
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
]
server = await manager.server_add(cmdline=cmdline, config=cfg)
logger.info(f'server_id = {server.server_id}')
cql = manager.get_cql()
await manager.api.disable_tablet_balancing(server.ip_addr)
inital_tablets = 32
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {inital_tablets}}}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# insert data
pks = range(inital_tablets)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
# force merge on the test table
expected_tablet_count = inital_tablets // 2
await cql.run_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': {expected_tablet_count}}}")
await manager.api.enable_tablet_balancing(server.ip_addr)
# wait for merge to complete
actual_tablet_count = 0
started = time.time()
while expected_tablet_count != actual_tablet_count:
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
assert time.time() - started < 60, 'Timeout while waiting for tablet merge'
await asyncio.sleep(.1)
logger.info(f'Merged test table; new number of tablets: {expected_tablet_count}')
# assert that the number of records has not changed
qry = f'SELECT * FROM {ks}.test'
logger.info(f'Running: {qry}')
res = cql.execute(qry)
missing = set(pks)
rec_count = 0
for row in res:
rec_count += 1
missing.discard(row.pk)
assert rec_count == len(pks), f"received {rec_count} records instead of {len(pks)} while querying server {server.server_id}; missing keys: {missing}"
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_merge_with_drop(manager: ManagerClient):
# This is a test and reproducer for issue:
# https://github.com/scylladb/scylladb/issues/23313
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
]
server = await manager.server_add(cmdline=cmdline, config=cfg)
logger.info(f'server_id = {server.server_id}')
cql = manager.get_cql()
await manager.api.disable_tablet_balancing(server.ip_addr)
initial_tablets = 32
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# insert data
pks = range(initial_tablets)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True)
# force merge on the test table
expected_tablet_count = initial_tablets // 2
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
s0_log = await manager.server_open_log(server.server_id)
s0_mark = await s0_log.mark()
await manager.api.enable_tablet_balancing(server.ip_addr)
# wait for merge to complete
actual_tablet_count = 0
started = time.time()
while expected_tablet_count != actual_tablet_count:
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
assert time.time() - started < 120, 'Timeout while waiting for tablet merge'
await asyncio.sleep(.1)
await s0_log.wait_for('merge_completion_fiber: waiting', from_mark=s0_mark)
await manager.api.enable_injection(server.ip_addr, "compaction_group_stop_wait", one_shot=True)
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
await s0_log.wait_for('compaction_group_stop_wait: waiting', from_mark=s0_mark)
drop_table_fut = cql.run_async(f"drop table {ks}.test")
await asyncio.sleep(0.1)
await manager.api.message_injection(server.ip_addr, "compaction_group_stop_wait")
await drop_table_fut