mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-23 18:10:39 +00:00
When it deadlocks, groups stop merging and compaction group merge backlog will run-away. Also, graceful shutdown will be blocked on it. Found by flaky unit test test_merge_chooses_best_replica_with_odd_count, which timed-out in 1 in 100 runs. Reason for deadlock: When storage groups are merged, the main compaction group of the new storage group takes a compaction lock, which is appended to _compaction_reenablers_for_merging, and released when the merge completion fiber is done with the whole batch. If we accumulate more than 1 merge cycle for the fiber, deadlock occurs. Lock order will be this Initial state: cg0: main cg1: main cg2: main cg3: main After 1st merge: cg0': main [locked], merging_groups=[cg0.main, cg1.main] cg1': main [locked], merging_groups=[cg2.main, cg3.main] After 2nd merge: cg0'': main [locked], merging_groups=[cg0'.main [locked], cg0.main, cg1.main, cg1'.main [locked], cg2.main, cg3.main] merge completion fiber will try to stop cg0'.main, which will be blocked on compaction lock. which is held by the reenabler in _compaction_reenablers_for_merging, hence deadlock. The fix is to wait for background merge to finish before we start the next merge. It's achieved by holding old erm in the background merge, and doing a topology barrier from the merge finalizing transition. Background merge is supposed to be a relatively quick operation, it's stopping compaction groups. So may wait for active requests. It shouldn't prolong the barrier indefinitely. Tablet boost unit tests which trigger merge need to be adjusted to call the barrier, otherwise they will be vulnerable to the deadlock. Two cluster tests were removed because they assumed that merge happens in the backgournd. Now that it happens as part of merge finalization, and blocks topology state machine, those tests deadlock because they are unable to make topology changes (node bootstrap) while background merge is blocked. The test "test_tablets_merge_waits_for_lwt" needed to be adjusted. It assumed that merge finalization doesn't wait for the erm held by the LWT operation, and triggered tablet movement afterwards, and assumed that this migration will issue a barrier which will block on the LWT operation. After this commit, it's the barrier in merge finalization which is blocked. The test was adjusted to use an earlier log mark when waiting for "Got raft_topology_cmd::barrier_and_drain", which will catch the barrier in merge finalization. Fixes SCYLLADB-928
653 lines
28 KiB
Python
653 lines
28 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from test.pylib.internal_types import ServerInfo
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import inject_error_one_shot, read_barrier
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count
|
|
from test.pylib.util import wait_for
|
|
from test.cluster.util import new_test_keyspace, create_new_test_keyspace
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import random
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def inject_error_one_shot_on(manager, error_name, servers):
|
|
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
|
|
async def inject_error_on(manager, error_name, servers):
|
|
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def disable_injection_on(manager, error_name, servers):
|
|
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_merge_simple(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--target-tablet-size-in-bytes', '30000',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
|
|
|
|
# Initial average table size of 400k (1 tablet), so triggers some splits.
|
|
total_keys = 200
|
|
keys = range(total_keys)
|
|
def populate(keys):
|
|
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
|
for pk in keys:
|
|
value = random.randbytes(2000)
|
|
cql.execute(insert, [pk, value])
|
|
populate(keys)
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == len(keys)
|
|
|
|
await check()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 1
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Increases the chance of tablet migration concurrent with split
|
|
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
|
|
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Now there's a split and migration need, so they'll potentially run concurrently.
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await check()
|
|
time.sleep(2) # Give load balancer some time to do work
|
|
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
await check()
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 1
|
|
|
|
# Allow shuffling of tablet replicas to make co-location work harder
|
|
async def shuffle():
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
time.sleep(2)
|
|
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
await shuffle()
|
|
|
|
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
|
|
# balancer won't break co-location.
|
|
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
|
|
|
|
# Shrinks table significantly, forcing merge.
|
|
delete_keys = range(total_keys - 1)
|
|
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
|
|
keys = range(total_keys - 1, total_keys)
|
|
|
|
# To avoid race of major with migration
|
|
await manager.disable_tablet_balancing()
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
|
|
# Waits for balancer to co-locate sibling tablets
|
|
await s1_log.wait_for("All sibling tablets are co-located")
|
|
# Do some shuffling to make sure balancer works with co-located tablets
|
|
await shuffle()
|
|
|
|
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await inject_error_on(manager, "replica_merge_completion_wait", servers)
|
|
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
|
|
|
|
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count < old_tablet_count
|
|
await check()
|
|
|
|
# Reproduces https://github.com/scylladb/scylladb/issues/21867 that could cause compaction group
|
|
# to be destroyed without being stopped first.
|
|
# That's done by:
|
|
# 1) Migrating a tablet to another node, and putting an artificial delay in cleanup stage when stopping groups
|
|
# 2) Force tablet split, causing new groups to be added in a tablet being cleaned up
|
|
# Without the fix, new groups are added to tablet being migrated away and never closed, potentially
|
|
# resulting in an use-after-free.
|
|
keys = range(total_keys)
|
|
populate(keys)
|
|
# Migrates a tablet to another node and put artificial delay on cleanup stage
|
|
await manager.api.enable_injection(servers[0].ip_addr, "delay_tablet_compaction_groups_cleanup", one_shot=True)
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablet_replicas) > 0
|
|
t = tablet_replicas[0]
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token))
|
|
# Trigger split
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
try:
|
|
await migration_task
|
|
except:
|
|
# move_tablet() fails if tablet is already in transit.
|
|
# forgive if balancer decided to migrate the target tablet post split.
|
|
pass
|
|
|
|
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
|
await check()
|
|
|
|
# Multiple cycles of split and merge, with topology changes in parallel and RF > 1.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=info',
|
|
'--logger-log-level', 'table=info',
|
|
'--logger-log-level', 'raft_topology=info',
|
|
'--logger-log-level', 'group0_raft_sm=info',
|
|
'--logger-log-level', 'load_balancer=info',
|
|
'--target-tablet-size-in-bytes', '30000',
|
|
]
|
|
config = {
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
servers = [await manager.server_add(config=config, cmdline=cmdline),
|
|
await manager.server_add(config=config, cmdline=cmdline),
|
|
await manager.server_add(config=config, cmdline=cmdline)]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
|
|
|
|
async def perform_topology_ops():
|
|
logger.info("Topology ops in background")
|
|
server_id_to_decommission = servers[-1].server_id
|
|
logger.info("Decommissioning old server with id {}".format(server_id_to_decommission))
|
|
await manager.decommission_node(server_id_to_decommission)
|
|
servers.pop()
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
logger.info("Completed topology ops")
|
|
|
|
for cycle in range(2):
|
|
logger.info("Running split-merge cycle #{}".format(cycle))
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Inserting data")
|
|
# Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits.
|
|
total_keys = 200
|
|
keys = range(total_keys)
|
|
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
|
for pk in keys:
|
|
value = random.randbytes(2000)
|
|
cql.execute(insert, [pk, value])
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == len(keys)
|
|
|
|
await check()
|
|
|
|
logger.info("Flushing keyspace")
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
|
|
# Increases the chance of tablet migration concurrent with split
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
logger.info("Enabling balancing")
|
|
# Now there's a split and migration need, so they'll potentially run concurrently.
|
|
await manager.enable_tablet_balancing()
|
|
|
|
topology_ops_task = asyncio.create_task(perform_topology_ops())
|
|
|
|
await check()
|
|
|
|
logger.info("Waiting for split")
|
|
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
logger.info("Waiting for topology ops")
|
|
await topology_ops_task
|
|
|
|
await check()
|
|
|
|
old_tablet_count = tablet_count
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > old_tablet_count
|
|
logger.info("Split increased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
|
|
|
|
# Allow shuffling of tablet replicas to make co-location work harder
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
|
|
# balancer won't break co-location.
|
|
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
|
|
|
|
logger.info("Deleting data")
|
|
# Delete almost all keys, enough to trigger a few merges.
|
|
delete_keys = range(total_keys - 1)
|
|
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
|
|
keys = range(total_keys - 1, total_keys)
|
|
|
|
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
# To avoid race of major with migration
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Flushing keyspace and performing major")
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
|
await manager.enable_tablet_balancing()
|
|
|
|
logger.info("Waiting for merge decision")
|
|
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
|
|
# Waits for balancer to co-locate sibling tablets
|
|
await s1_log.wait_for("All sibling tablets are co-located")
|
|
# Do some shuffling to make sure balancer works with co-located tablets
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
|
|
topology_ops_task = asyncio.create_task(perform_topology_ops())
|
|
|
|
await inject_error_on(manager, "replica_merge_completion_wait", servers)
|
|
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
|
|
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
|
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
|
|
|
|
logger.info("Waiting for topology ops")
|
|
await topology_ops_task
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count < old_tablet_count
|
|
logger.info("Merge decreased number of tablets from {} to {}".format(old_tablet_count, tablet_count))
|
|
await check()
|
|
|
|
logger.info("Flushing keyspace and performing major")
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
|
await check()
|
|
|
|
@pytest.mark.parametrize("racks", [2, 3])
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks):
|
|
cmdline = ['--target-tablet-size-in-bytes', '30000',]
|
|
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
|
|
servers = []
|
|
rf = racks
|
|
for rack_id in range(0, racks):
|
|
rack = f'rack{rack_id+1}'
|
|
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
|
|
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
|
|
|
|
await inject_error_on(manager, "forbid_cross_rack_migration_attempt", servers)
|
|
|
|
total_keys = 400
|
|
keys = range(total_keys)
|
|
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
|
for pk in keys:
|
|
value = random.randbytes(2000)
|
|
cql.execute(insert, [pk, value])
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
async def finished_splitting():
|
|
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
|
|
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
|
|
# Per-table hints (min_tablet_count) can be used to improve this.
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count >= 16 or None
|
|
# Give enough time for split to happen in debug mode
|
|
await wait_for(finished_splitting, time.time() + 120)
|
|
|
|
delete_keys = range(total_keys - 1)
|
|
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
|
|
keys = range(total_keys - 1, total_keys)
|
|
|
|
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
|
|
|
async def finished_merging():
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count < old_tablet_count or None
|
|
await wait_for(finished_merging, time.time() + 120)
|
|
|
|
# Reproduces #23284
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_split_merge_with_many_tables(build_mode: str, manager: ManagerClient, racks = 2):
|
|
cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',]
|
|
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
|
|
|
|
servers = []
|
|
rf = racks
|
|
for rack_id in range(0, racks):
|
|
rack = f'rack{rack_id+1}'
|
|
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
|
|
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
|
|
num_tables = 200 if build_mode != 'debug' else 20
|
|
await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, num_tables)])
|
|
|
|
async def check_logs(when):
|
|
for server in servers:
|
|
log = await manager.server_open_log(server.server_id)
|
|
matches = await log.grep("Too long queue accumulated for gossip")
|
|
if matches:
|
|
pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}")
|
|
|
|
await check_logs("after creating tables")
|
|
|
|
total_keys = 400
|
|
keys = range(total_keys)
|
|
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
|
for pk in keys:
|
|
value = random.randbytes(2000)
|
|
cql.execute(insert, [pk, value])
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
async def finished_splitting():
|
|
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
|
|
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
|
|
# Per-table hints (min_tablet_count) can be used to improve this.
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count >= 16 or None
|
|
# Give enough time for split to happen in debug mode
|
|
await wait_for(finished_splitting, time.time() + 120)
|
|
|
|
await check_logs("after split completion")
|
|
|
|
delete_keys = range(total_keys - 1)
|
|
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
|
|
keys = range(total_keys - 1, total_keys)
|
|
|
|
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
|
|
|
async def finished_merging():
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count < old_tablet_count or None
|
|
await wait_for(finished_merging, time.time() + 120)
|
|
|
|
await check_logs("after merge completion")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_missing_data(manager: ManagerClient):
|
|
|
|
# This is a test and reproducer for issue:
|
|
# https://github.com/scylladb/scylladb/issues/23313
|
|
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
]
|
|
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
|
|
|
logger.info(f'server_id = {server.server_id}')
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
inital_tablets = 32
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {inital_tablets}}}") as ks:
|
|
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
|
|
|
|
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
|
|
|
# insert data
|
|
pks = range(inital_tablets)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
# force merge on the test table
|
|
expected_tablet_count = inital_tablets // 2
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': {expected_tablet_count}}}")
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
# wait for merge to complete
|
|
actual_tablet_count = 0
|
|
started = time.time()
|
|
while expected_tablet_count != actual_tablet_count:
|
|
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
|
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
|
|
|
|
assert time.time() - started < 60, 'Timeout while waiting for tablet merge'
|
|
|
|
await asyncio.sleep(.1)
|
|
|
|
logger.info(f'Merged test table; new number of tablets: {expected_tablet_count}')
|
|
|
|
# assert that the number of records has not changed
|
|
qry = f'SELECT * FROM {ks}.test'
|
|
logger.info(f'Running: {qry}')
|
|
res = cql.execute(qry)
|
|
missing = set(pks)
|
|
rec_count = 0
|
|
for row in res:
|
|
rec_count += 1
|
|
missing.discard(row.pk)
|
|
|
|
assert rec_count == len(pks), f"received {rec_count} records instead of {len(pks)} while querying server {server.server_id}; missing keys: {missing}"
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_merge_with_drop(manager: ManagerClient):
|
|
|
|
# This is a test and reproducer for issue:
|
|
# https://github.com/scylladb/scylladb/issues/23313
|
|
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
]
|
|
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
|
|
|
logger.info(f'server_id = {server.server_id}')
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 32
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
|
|
|
# insert data
|
|
pks = range(initial_tablets)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True)
|
|
|
|
# force merge on the test table
|
|
expected_tablet_count = initial_tablets // 2
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
s0_log = await manager.server_open_log(server.server_id)
|
|
s0_mark = await s0_log.mark()
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
# wait for merge to complete
|
|
actual_tablet_count = 0
|
|
started = time.time()
|
|
while expected_tablet_count != actual_tablet_count:
|
|
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
|
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
|
|
|
|
assert time.time() - started < 120, 'Timeout while waiting for tablet merge'
|
|
|
|
await asyncio.sleep(.1)
|
|
|
|
await s0_log.wait_for('merge_completion_fiber: waiting', from_mark=s0_mark)
|
|
await manager.api.enable_injection(server.ip_addr, "compaction_group_stop_wait", one_shot=True)
|
|
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
|
|
await s0_log.wait_for('compaction_group_stop_wait: waiting', from_mark=s0_mark)
|
|
|
|
drop_table_fut = cql.run_async(f"drop table {ks}.test")
|
|
await asyncio.sleep(0.1)
|
|
await manager.api.message_injection(server.ip_addr, "compaction_group_stop_wait")
|
|
await drop_table_fut
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_background_merge_deadlock(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://scylladb.atlassian.net/browse/SCYLLADB-928
|
|
|
|
Reproduces a deadlock in the background merge completion handler that can happen when multiple merges accumulate.
|
|
If we accumulate more than 1 merge cycle for the fiber, deadlock occurs due to compaction lock taken
|
|
on the main group (post-merge). The lock is held until compaction groups are precessed by the background merge
|
|
fiber
|
|
|
|
Example:
|
|
|
|
Initial state:
|
|
|
|
cg0: main,
|
|
cg1: main
|
|
cg2: main
|
|
cg3: main
|
|
|
|
After 1st merge:
|
|
|
|
cg0': main [locked], merging_groups=[cg0.main, cg1.main]
|
|
cg1': main [locked], merging_groups=[cg2.main, cg3.main]
|
|
|
|
After 2nd merge:
|
|
|
|
cg0'': main [locked], merging_groups=[cg0'.main [locked], cg0.main, cg1.main, cg1'.main [locked], cg2.main, cg3.main]
|
|
|
|
The test reproduces this by doing a tablet merge from 8 tablets to 1 (8 -> 4 -> 2 -> 1). The background merge fiber
|
|
is blocked until after the first merge (to 4), so that there is a higher chance of two merges queueing in the fiber.
|
|
|
|
If deadlock occurs, node shutdown will hang waiting for the background merge fiber. That's why the test
|
|
tries to stop the node at the end.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
cql, _ = await manager.get_ready_cql(servers)
|
|
|
|
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}")
|
|
|
|
# Create a table which will go through 3 merge cycles.
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) with tablets = {{'min_tablet_count': 8}};")
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "merge_completion_fiber", one_shot=True)
|
|
log = await manager.server_open_log(servers[0].server_id)
|
|
mark = await log.mark()
|
|
|
|
# Trigger tablet merging
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': 1}};")
|
|
|
|
async def produced_one_merge():
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count == 4 or None
|
|
await wait_for(produced_one_merge, time.time() + 120)
|
|
|
|
mark, _ = await log.wait_for(f"merge_completion_fiber: waiting", from_mark=mark)
|
|
await manager.api.message_injection(servers[0].ip_addr, "merge_completion_fiber")
|
|
mark, _ = await log.wait_for(f"merge_completion_fiber: message received", from_mark=mark)
|
|
|
|
async def finished_merge():
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count == 1 or None
|
|
|
|
await wait_for(finished_merge, time.time() + 120)
|
|
|
|
await manager.server_stop(servers[0].server_id)
|