diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index eed0cbffa1..015e0e6346 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -321,6 +321,7 @@ public: virtual bool all_storage_groups_split() = 0; virtual future<> split_all_storage_groups() = 0; virtual future<> maybe_split_compaction_group_of(size_t idx) = 0; + virtual future> maybe_split_sstable(const sstables::shared_sstable& sst) = 0; virtual lw_shared_ptr make_sstable_set() const = 0; }; diff --git a/replica/database.hh b/replica/database.hh index 94340a25e1..155c95e5db 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -580,6 +580,11 @@ public: // be split once it returns. future<> maybe_split_compaction_group_of(locator::tablet_id); private: + // If SSTable doesn't need split, the same input SSTable is returned as output. + // If SSTable needs split, then output SSTables are returned and the input SSTable is deleted. + // NOTE: it must only be used on new SSTables that weren't added to the set yet. + future> maybe_split_new_sstable(const sstables::shared_sstable&); + // Called when coordinator executes tablet splitting, i.e. commit the new tablet map with // each tablet split into two, so this replica will remap all of its compaction groups // that were previously split. @@ -619,6 +624,7 @@ private: return _config.enable_cache && _schema->caching_options().enabled(); } void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept; + future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy, bool trigger_compaction); future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction); // Helpers which add sstable on behalf of a compaction group and refreshes compound set. void add_sstable(compaction_group& cg, sstables::shared_sstable sstable); diff --git a/replica/table.cc b/replica/table.cc index dda72dbea8..24657aec17 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -675,6 +675,9 @@ public: bool all_storage_groups_split() override { return true; } future<> split_all_storage_groups() override { return make_ready_future(); } future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); } + future> maybe_split_sstable(const sstables::shared_sstable& sst) override { + return make_ready_future>(std::vector{sst}); + } lw_shared_ptr make_sstable_set() const override { return get_compaction_group().make_sstable_set(); @@ -774,6 +777,7 @@ public: bool all_storage_groups_split() override; future<> split_all_storage_groups() override; future<> maybe_split_compaction_group_of(size_t idx) override; + future> maybe_split_sstable(const sstables::shared_sstable& sst) override; lw_shared_ptr make_sstable_set() const override { // FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time. @@ -856,8 +860,14 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) { co_return; } + if (_main_cg->empty()) { + co_return; + } + auto holder = _main_cg->async_gate().hold(); co_await _main_cg->flush(); - co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt)); + // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. + co_await _main_cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{}); + co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{}); } lw_shared_ptr storage_group::make_sstable_set() const { @@ -933,11 +943,27 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id return sg->split(split_compaction_options()); } +future> +tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) { + if (!tablet_map().needs_split()) { + co_return std::vector{sst}; + } + + auto& cg = compaction_group_for_sstable(sst); + auto holder = cg.async_gate().hold(); + co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.as_table_state(), split_compaction_options()); +} + future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) { auto holder = async_gate().hold(); co_await _sg_manager->maybe_split_compaction_group_of(tablet_id.value()); } +future> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) { + auto holder = async_gate().hold(); + co_return co_await _sg_manager->maybe_split_sstable(sst); +} + std::unique_ptr table::make_storage_group_manager() { std::unique_ptr ret; if (uses_tablets()) { @@ -1125,11 +1151,8 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no } future<> -table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) { - compaction_group& cg = compaction_group_for_sstable(sst); - // Hold gate to make share compaction group is alive. - auto holder = cg.async_gate().hold(); - +table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy, + bool trigger_compaction) { auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept { // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. @@ -1146,6 +1169,16 @@ table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::o }), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true})); } +future<> +table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) { + for (auto sst : co_await maybe_split_new_sstable(new_sst)) { + auto& cg = compaction_group_for_sstable(sst); + // Hold gate to make share compaction group is alive. + auto holder = cg.async_gate().hold(); + co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction); + } +} + future<> table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) { bool do_trigger_compaction = offstrategy == sstables::offstrategy::no; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index d781b958b0..ce058b20e8 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -2466,6 +2466,9 @@ future topology_coordinator::maybe_start_tablet_split_finalization(group0_ if (plan.finalize_resize.empty()) { co_return false; } + if (utils::get_local_injector().enter("tablet_split_finalization_postpone")) { + co_return false; + } std::vector updates; diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 93bea32698..6a1698615b 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -447,6 +447,76 @@ async def test_tablet_repair(manager: ManagerClient): for r in rows: assert r.c == repair_cycles - 1 +# Reproducer for race between split and repair: https://github.com/scylladb/scylladb/issues/19378 +# Verifies repair will not complete with sstables that still require split, causing split +# execution to fail. +@pytest.mark.repair +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_concurrent_tablet_repair_and_split(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'raft_topology=debug', + '--target-tablet-size-in-bytes', '1024', + ] + servers = await manager.servers_add(3, cmdline=cmdline, config={ + 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] + }) + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " + "'replication_factor': 2} AND tablets = {'initial': 32};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + + logger.info("Populating table") + + keys = range(5000) # Enough keys to trigger repair digest mismatch with a high chance. + stmt = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?)") + stmt.consistency_level = ConsistencyLevel.ONE + + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s0_log = await manager.server_open_log(servers[0].server_id) + s0_mark = await s0_log.mark() + + await asyncio.gather(*[cql.run_async(stmt, [k, -1]) for k in keys]) + + # split decision is sstable size based, so data must be flushed first + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + + await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", False) + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + logger.info("Waiting for split prepare...") + await s0_log.wait_for('Setting split ready sequence number to', from_mark=s0_mark) + s0_mark = await s0_log.mark() + logger.info("Waited for split prepare") + + # Balancer is re-enabled later for split execution + await asyncio.create_task(manager.api.disable_tablet_balancing(servers[0].ip_addr)) + + # Write concurrently with repair to increase the chance of repair having some discrepancy to resolve and send writes. + inserts_future = asyncio.gather(*[cql.run_async(stmt, [k, 1]) for k in keys]) + + await repair_on_node(manager, servers[0], servers) + + await inserts_future + + logger.info("Waiting for split execute...") + await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone") + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + await s0_log.wait_for('Detected tablet split for table', from_mark=s0_mark) + await inject_error_one_shot_on(manager, "tablet_split_finalization_postpone", servers) + logger.info("Waited for split execute...") + + key_count = len(keys) + stmt = cql.prepare("SELECT * FROM test.test;") + stmt.consistency_level = ConsistencyLevel.ALL + rows = await cql.run_async(stmt) + assert len(rows) == key_count @pytest.mark.repair @pytest.mark.asyncio