Files
scylladb/test/cluster/test_tablets_merge.py
Tomasz Grabiec 1256a9faa7 tablets: Fix deadlock in background storage group merge fiber
When it deadlocks, groups stop merging and compaction group merge
backlog will run-away.

Also, graceful shutdown will be blocked on it.

Found by flaky unit test
test_merge_chooses_best_replica_with_odd_count, which timed-out in 1
in 100 runs.

Reason for deadlock:

When storage groups are merged, the main compaction group of the new
storage group takes a compaction lock, which is appended to
_compaction_reenablers_for_merging, and released when the merge
completion fiber is done with the whole batch.

If we accumulate more than 1 merge cycle for the fiber, deadlock
occurs. Lock order will be this

Initial state:

 cg0: main
 cg1: main
 cg2: main
 cg3: main

After 1st merge:

 cg0': main [locked], merging_groups=[cg0.main, cg1.main]
 cg1': main [locked], merging_groups=[cg2.main, cg3.main]

After 2nd merge:

 cg0'': main [locked], merging_groups=[cg0'.main [locked], cg0.main, cg1.main, cg1'.main [locked], cg2.main, cg3.main]

merge completion fiber will try to stop cg0'.main, which will be
blocked on compaction lock. which is held by the reenabler in
_compaction_reenablers_for_merging, hence deadlock.

The fix is to wait for background merge to finish before we start the
next merge. It's achieved by holding old erm in the background merge,
and doing a topology barrier from the merge finalizing transition.

Background merge is supposed to be a relatively quick operation, it's
stopping compaction groups. So may wait for active requests. It
shouldn't prolong the barrier indefinitely.

Tablet boost unit tests which trigger merge need to be adjusted to
call the barrier, otherwise they will be vulnerable to the deadlock.

Two cluster tests were removed because they assumed that merge happens
in the backgournd. Now that it happens as part of merge finalization,
and blocks topology state machine, those tests deadlock because they
are unable to make topology changes (node bootstrap) while background
merge is blocked.

The test "test_tablets_merge_waits_for_lwt" needed to be adjusted. It
assumed that merge finalization doesn't wait for the erm held by the
LWT operation, and triggered tablet movement afterwards, and assumed
that this migration will issue a barrier which will block on the LWT
operation. After this commit, it's the barrier in merge finalization
which is blocked. The test was adjusted to use an earlier log mark
when waiting for "Got raft_topology_cmd::barrier_and_drain", which
will catch the barrier in merge finalization.

Fixes SCYLLADB-928
2026-03-12 22:45:01 +01:00

653 lines
28 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, read_barrier
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count
from test.pylib.util import wait_for
from test.cluster.util import new_test_keyspace, create_new_test_keyspace
import pytest
import asyncio
import logging
import time
import random
logger = logging.getLogger(__name__)
async def inject_error_one_shot_on(manager, error_name, servers):
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_merge_simple(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--logger-log-level', 'load_balancer=debug',
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
# Initial average table size of 400k (1 tablet), so triggers some splits.
total_keys = 200
keys = range(total_keys)
def populate(keys):
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
populate(keys)
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == len(keys)
await check()
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 1
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Increases the chance of tablet migration concurrent with split
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.enable_tablet_balancing()
await check()
time.sleep(2) # Give load balancer some time to do work
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await check()
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > 1
# Allow shuffling of tablet replicas to make co-location work harder
async def shuffle():
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
time.sleep(2)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await shuffle()
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
# balancer won't break co-location.
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
# Shrinks table significantly, forcing merge.
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
# To avoid race of major with migration
await manager.disable_tablet_balancing()
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await manager.enable_tablet_balancing()
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
await s1_log.wait_for("All sibling tablets are co-located")
# Do some shuffling to make sure balancer works with co-located tablets
await shuffle()
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
s1_mark = await s1_log.mark()
await inject_error_on(manager, "replica_merge_completion_wait", servers)
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count < old_tablet_count
await check()
# Reproduces https://github.com/scylladb/scylladb/issues/21867 that could cause compaction group
# to be destroyed without being stopped first.
# That's done by:
# 1) Migrating a tablet to another node, and putting an artificial delay in cleanup stage when stopping groups
# 2) Force tablet split, causing new groups to be added in a tablet being cleaned up
# Without the fix, new groups are added to tablet being migrated away and never closed, potentially
# resulting in an use-after-free.
keys = range(total_keys)
populate(keys)
# Migrates a tablet to another node and put artificial delay on cleanup stage
await manager.api.enable_injection(servers[0].ip_addr, "delay_tablet_compaction_groups_cleanup", one_shot=True)
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablet_replicas) > 0
t = tablet_replicas[0]
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token))
# Trigger split
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
try:
await migration_task
except:
# move_tablet() fails if tablet is already in transit.
# forgive if balancer decided to migrate the target tablet post split.
pass
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await check()
# Multiple cycles of split and merge, with topology changes in parallel and RF > 1.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=info',
'--logger-log-level', 'table=info',
'--logger-log-level', 'raft_topology=info',
'--logger-log-level', 'group0_raft_sm=info',
'--logger-log-level', 'load_balancer=info',
'--target-tablet-size-in-bytes', '30000',
]
config = {
'tablet_load_stats_refresh_interval_in_seconds': 1
}
servers = [await manager.server_add(config=config, cmdline=cmdline),
await manager.server_add(config=config, cmdline=cmdline),
await manager.server_add(config=config, cmdline=cmdline)]
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
async def perform_topology_ops():
logger.info("Topology ops in background")
server_id_to_decommission = servers[-1].server_id
logger.info("Decommissioning old server with id {}".format(server_id_to_decommission))
await manager.decommission_node(server_id_to_decommission)
servers.pop()
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
logger.info("Completed topology ops")
for cycle in range(2):
logger.info("Running split-merge cycle #{}".format(cycle))
await manager.disable_tablet_balancing()
logger.info("Inserting data")
# Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits.
total_keys = 200
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == len(keys)
await check()
logger.info("Flushing keyspace")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
# Increases the chance of tablet migration concurrent with split
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
logger.info("Enabling balancing")
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.enable_tablet_balancing()
topology_ops_task = asyncio.create_task(perform_topology_ops())
await check()
logger.info("Waiting for split")
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
logger.info("Waiting for topology ops")
await topology_ops_task
await check()
old_tablet_count = tablet_count
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > old_tablet_count
logger.info("Split increased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
# Allow shuffling of tablet replicas to make co-location work harder
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
# balancer won't break co-location.
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
logger.info("Deleting data")
# Delete almost all keys, enough to trigger a few merges.
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
# To avoid race of major with migration
await manager.disable_tablet_balancing()
logger.info("Flushing keyspace and performing major")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await manager.enable_tablet_balancing()
logger.info("Waiting for merge decision")
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
await s1_log.wait_for("All sibling tablets are co-located")
# Do some shuffling to make sure balancer works with co-located tablets
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
topology_ops_task = asyncio.create_task(perform_topology_ops())
await inject_error_on(manager, "replica_merge_completion_wait", servers)
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
logger.info("Waiting for topology ops")
await topology_ops_task
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count < old_tablet_count
logger.info("Merge decreased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
await check()
logger.info("Flushing keyspace and performing major")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await check()
@pytest.mark.parametrize("racks", [2, 3])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks):
cmdline = ['--target-tablet-size-in-bytes', '30000',]
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = []
rf = racks
for rack_id in range(0, racks):
rack = f'rack{rack_id+1}'
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
await inject_error_on(manager, "forbid_cross_rack_migration_attempt", servers)
total_keys = 400
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
async def finished_splitting():
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
# Per-table hints (min_tablet_count) can be used to improve this.
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= 16 or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
async def finished_merging():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
# Reproduces #23284
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_split_merge_with_many_tables(build_mode: str, manager: ManagerClient, racks = 2):
cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',]
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = []
rf = racks
for rack_id in range(0, racks):
rack = f'rack{rack_id+1}'
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
num_tables = 200 if build_mode != 'debug' else 20
await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, num_tables)])
async def check_logs(when):
for server in servers:
log = await manager.server_open_log(server.server_id)
matches = await log.grep("Too long queue accumulated for gossip")
if matches:
pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}")
await check_logs("after creating tables")
total_keys = 400
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
async def finished_splitting():
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
# Per-table hints (min_tablet_count) can be used to improve this.
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= 16 or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
await check_logs("after split completion")
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
async def finished_merging():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
await check_logs("after merge completion")
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_missing_data(manager: ManagerClient):
# This is a test and reproducer for issue:
# https://github.com/scylladb/scylladb/issues/23313
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
]
server = await manager.server_add(cmdline=cmdline, config=cfg)
logger.info(f'server_id = {server.server_id}')
cql = manager.get_cql()
await manager.disable_tablet_balancing()
inital_tablets = 32
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {inital_tablets}}}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# insert data
pks = range(inital_tablets)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
# force merge on the test table
expected_tablet_count = inital_tablets // 2
await cql.run_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': {expected_tablet_count}}}")
await manager.enable_tablet_balancing()
# wait for merge to complete
actual_tablet_count = 0
started = time.time()
while expected_tablet_count != actual_tablet_count:
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
assert time.time() - started < 60, 'Timeout while waiting for tablet merge'
await asyncio.sleep(.1)
logger.info(f'Merged test table; new number of tablets: {expected_tablet_count}')
# assert that the number of records has not changed
qry = f'SELECT * FROM {ks}.test'
logger.info(f'Running: {qry}')
res = cql.execute(qry)
missing = set(pks)
rec_count = 0
for row in res:
rec_count += 1
missing.discard(row.pk)
assert rec_count == len(pks), f"received {rec_count} records instead of {len(pks)} while querying server {server.server_id}; missing keys: {missing}"
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_merge_with_drop(manager: ManagerClient):
# This is a test and reproducer for issue:
# https://github.com/scylladb/scylladb/issues/23313
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
]
server = await manager.server_add(cmdline=cmdline, config=cfg)
logger.info(f'server_id = {server.server_id}')
cql = manager.get_cql()
await manager.disable_tablet_balancing()
initial_tablets = 32
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# insert data
pks = range(initial_tablets)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True)
# force merge on the test table
expected_tablet_count = initial_tablets // 2
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
s0_log = await manager.server_open_log(server.server_id)
s0_mark = await s0_log.mark()
await manager.enable_tablet_balancing()
# wait for merge to complete
actual_tablet_count = 0
started = time.time()
while expected_tablet_count != actual_tablet_count:
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
assert time.time() - started < 120, 'Timeout while waiting for tablet merge'
await asyncio.sleep(.1)
await s0_log.wait_for('merge_completion_fiber: waiting', from_mark=s0_mark)
await manager.api.enable_injection(server.ip_addr, "compaction_group_stop_wait", one_shot=True)
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
await s0_log.wait_for('compaction_group_stop_wait: waiting', from_mark=s0_mark)
drop_table_fut = cql.run_async(f"drop table {ks}.test")
await asyncio.sleep(0.1)
await manager.api.message_injection(server.ip_addr, "compaction_group_stop_wait")
await drop_table_fut
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_background_merge_deadlock(manager: ManagerClient):
"""
Reproducer for https://scylladb.atlassian.net/browse/SCYLLADB-928
Reproduces a deadlock in the background merge completion handler that can happen when multiple merges accumulate.
If we accumulate more than 1 merge cycle for the fiber, deadlock occurs due to compaction lock taken
on the main group (post-merge). The lock is held until compaction groups are precessed by the background merge
fiber
Example:
Initial state:
cg0: main,
cg1: main
cg2: main
cg3: main
After 1st merge:
cg0': main [locked], merging_groups=[cg0.main, cg1.main]
cg1': main [locked], merging_groups=[cg2.main, cg3.main]
After 2nd merge:
cg0'': main [locked], merging_groups=[cg0'.main [locked], cg0.main, cg1.main, cg1'.main [locked], cg2.main, cg3.main]
The test reproduces this by doing a tablet merge from 8 tablets to 1 (8 -> 4 -> 2 -> 1). The background merge fiber
is blocked until after the first merge (to 4), so that there is a higher chance of two merges queueing in the fiber.
If deadlock occurs, node shutdown will hang waiting for the background merge fiber. That's why the test
tries to stop the node at the end.
"""
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'raft_topology=debug',
]
servers = [await manager.server_add(cmdline=cmdline)]
cql, _ = await manager.get_ready_cql(servers)
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}")
# Create a table which will go through 3 merge cycles.
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) with tablets = {{'min_tablet_count': 8}};")
await manager.api.enable_injection(servers[0].ip_addr, "merge_completion_fiber", one_shot=True)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
# Trigger tablet merging
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': 1}};")
async def produced_one_merge():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count == 4 or None
await wait_for(produced_one_merge, time.time() + 120)
mark, _ = await log.wait_for(f"merge_completion_fiber: waiting", from_mark=mark)
await manager.api.message_injection(servers[0].ip_addr, "merge_completion_fiber")
mark, _ = await log.wait_for(f"merge_completion_fiber: message received", from_mark=mark)
async def finished_merge():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count == 1 or None
await wait_for(finished_merge, time.time() + 120)
await manager.server_stop(servers[0].server_id)