diff --git a/replica/database.cc b/replica/database.cc index 010db1e1b2..e2d933161d 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2561,6 +2561,12 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s }); }); + co_await utils::get_local_injector().inject("truncate_compaction_disabled_wait", [] (auto& handler) -> future<> { + dblog.info("truncate_compaction_disabled_wait: wait"); + co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); + dblog.info("truncate_compaction_disabled_wait: done"); + }, false); + const auto should_flush = with_snapshot && cf.can_flush(); dblog.trace("{} {}.{} and views on all shards", should_flush ? "Flushing" : "Clearing", s->ks_name(), s->cf_name()); std::function(replica::table&)> flush_or_clear = should_flush ? diff --git a/replica/table.cc b/replica/table.cc index c3a0adddbe..c155d5f0ec 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1029,8 +1029,10 @@ bool tablet_storage_group_manager::all_storage_groups_split() { return true; } - auto split_ready = std::ranges::all_of(_storage_groups | std::views::values, - std::mem_fn(&storage_group::set_split_mode)); + bool split_ready = true; + for (const storage_group_ptr& sg : _storage_groups | std::views::values) { + split_ready &= sg->set_split_mode(); + } // The table replica will say to coordinator that its split status is ready by // mirroring the sequence number from tablet metadata into its local state, @@ -1058,6 +1060,12 @@ sstables::compaction_type_options::split tablet_storage_group_manager::split_com future<> tablet_storage_group_manager::split_all_storage_groups(tasks::task_info tablet_split_task_info) { sstables::compaction_type_options::split opt = split_compaction_options(); + co_await utils::get_local_injector().inject("split_storage_groups_wait", [] (auto& handler) -> future<> { + dblog.info("split_storage_groups_wait: waiting"); + co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); + dblog.info("split_storage_groups_wait: done"); + }, false); + co_await for_each_storage_group_gently([opt, tablet_split_task_info] (storage_group& storage_group) { return storage_group.split(opt, tablet_split_task_info); }); diff --git a/test/topology_custom/test_tablets.py b/test/topology_custom/test_tablets.py index 268cabb797..4da4a020f0 100644 --- a/test/topology_custom/test_tablets.py +++ b/test/topology_custom/test_tablets.py @@ -754,3 +754,54 @@ async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient, assert len(rows) == len(keys) for r in rows: assert r.c == r.pk + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_drop_keyspace_while_split(manager: ManagerClient): + + # Reproducer for: https://github.com/scylladb/scylladb/issues/22431 + # This tests if the split ready compaction groups are correctly created + # on a shard with several storage groups for the same table + + logger.info("Bootstrapping cluster") + cmdline = [ '--target-tablet-size-in-bytes', '8192', + '--smp', '2' ] + config = { 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] } + servers = [await manager.server_add(config=config, cmdline=cmdline)] + + s0_log = await manager.server_open_log(servers[0].server_id) + + cql = manager.get_cql() + await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60) + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + # create a table so that it has at least 2 tablets (and storage groups) per shard + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + + await manager.api.disable_autocompaction(servers[0].ip_addr, 'test') + + keys = range(2048) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + await manager.api.flush_keyspace(servers[0].ip_addr, 'test') + + await manager.api.enable_injection(servers[0].ip_addr, 'truncate_compaction_disabled_wait', one_shot=False) + await manager.api.enable_injection(servers[0].ip_addr, 'split_storage_groups_wait', one_shot=False) + + # enable the load balancer which should emmit a tablet split + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + # wait for compaction groups to be created and split to begin + await s0_log.wait_for('split_storage_groups_wait: wait') + + # start a DROP and wait for it to disable compaction + drop_ks_task = cql.run_async('DROP KEYSPACE test;') + await s0_log.wait_for('truncate_compaction_disabled_wait: wait') + + # release split + await manager.api.message_injection(servers[0].ip_addr, "split_storage_groups_wait") + + # release drop and wait for it to complete + await manager.api.message_injection(servers[0].ip_addr, "truncate_compaction_disabled_wait") + await drop_ks_task