From c67967b65acceb3646b62fd57bb0cfb6774dc6bf Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 21 Jun 2024 15:22:54 -0300 Subject: [PATCH] tablets: Fix race between repair and split Consider the following: T 0 split prepare starts 1 repair starts 2 split prepare finishes 3 repair adds unsplit sstables 4 repair ends 5 split executes If repair produces sstable after split prepare phase, the replica will not split that sstable later, as prepare phase is considered completed already. That causes split execution to fail as replicas weren't really prepared. This also can be triggered with load-and-stream which shares the same write (consumer) path. The approach to fix this is the same employed to prevent a race between split and migration. If migration happens during prepare phase, it can happen source misses the split request, but the tablet will still be split on the destination (if needed). Similarly, the repair writer becomes responsible for splitting the data if underlying table is in split mode. That's implemented in replica::table for correctness, so if node crashes, the new sstable missing split is still split before added to the set. Fixes #19378. Fixes #19416. Signed-off-by: Raphael S. Carvalho (cherry picked from commit 74612ad3586fb96e2e9c5238c714e7272896fc4d) --- replica/compaction_group.hh | 1 + replica/database.hh | 6 ++ replica/table.cc | 45 ++++++++++-- service/topology_coordinator.cc | 3 + .../test_tablets.py | 70 +++++++++++++++++++ 5 files changed, 119 insertions(+), 6 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index eed0cbffa1..015e0e6346 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -321,6 +321,7 @@ public: virtual bool all_storage_groups_split() = 0; virtual future<> split_all_storage_groups() = 0; virtual future<> maybe_split_compaction_group_of(size_t idx) = 0; + virtual future> maybe_split_sstable(const sstables::shared_sstable& sst) = 0; virtual lw_shared_ptr make_sstable_set() const = 0; }; diff --git a/replica/database.hh b/replica/database.hh index 94340a25e1..155c95e5db 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -580,6 +580,11 @@ public: // be split once it returns. future<> maybe_split_compaction_group_of(locator::tablet_id); private: + // If SSTable doesn't need split, the same input SSTable is returned as output. + // If SSTable needs split, then output SSTables are returned and the input SSTable is deleted. + // NOTE: it must only be used on new SSTables that weren't added to the set yet. + future> maybe_split_new_sstable(const sstables::shared_sstable&); + // Called when coordinator executes tablet splitting, i.e. commit the new tablet map with // each tablet split into two, so this replica will remap all of its compaction groups // that were previously split. @@ -619,6 +624,7 @@ private: return _config.enable_cache && _schema->caching_options().enabled(); } void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept; + future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy, bool trigger_compaction); future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction); // Helpers which add sstable on behalf of a compaction group and refreshes compound set. void add_sstable(compaction_group& cg, sstables::shared_sstable sstable); diff --git a/replica/table.cc b/replica/table.cc index dda72dbea8..24657aec17 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -675,6 +675,9 @@ public: bool all_storage_groups_split() override { return true; } future<> split_all_storage_groups() override { return make_ready_future(); } future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); } + future> maybe_split_sstable(const sstables::shared_sstable& sst) override { + return make_ready_future>(std::vector{sst}); + } lw_shared_ptr make_sstable_set() const override { return get_compaction_group().make_sstable_set(); @@ -774,6 +777,7 @@ public: bool all_storage_groups_split() override; future<> split_all_storage_groups() override; future<> maybe_split_compaction_group_of(size_t idx) override; + future> maybe_split_sstable(const sstables::shared_sstable& sst) override; lw_shared_ptr make_sstable_set() const override { // FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time. @@ -856,8 +860,14 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) { co_return; } + if (_main_cg->empty()) { + co_return; + } + auto holder = _main_cg->async_gate().hold(); co_await _main_cg->flush(); - co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt)); + // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. + co_await _main_cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{}); + co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{}); } lw_shared_ptr storage_group::make_sstable_set() const { @@ -933,11 +943,27 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id return sg->split(split_compaction_options()); } +future> +tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) { + if (!tablet_map().needs_split()) { + co_return std::vector{sst}; + } + + auto& cg = compaction_group_for_sstable(sst); + auto holder = cg.async_gate().hold(); + co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.as_table_state(), split_compaction_options()); +} + future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) { auto holder = async_gate().hold(); co_await _sg_manager->maybe_split_compaction_group_of(tablet_id.value()); } +future> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) { + auto holder = async_gate().hold(); + co_return co_await _sg_manager->maybe_split_sstable(sst); +} + std::unique_ptr table::make_storage_group_manager() { std::unique_ptr ret; if (uses_tablets()) { @@ -1125,11 +1151,8 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no } future<> -table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) { - compaction_group& cg = compaction_group_for_sstable(sst); - // Hold gate to make share compaction group is alive. - auto holder = cg.async_gate().hold(); - +table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy, + bool trigger_compaction) { auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept { // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. @@ -1146,6 +1169,16 @@ table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::o }), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true})); } +future<> +table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) { + for (auto sst : co_await maybe_split_new_sstable(new_sst)) { + auto& cg = compaction_group_for_sstable(sst); + // Hold gate to make share compaction group is alive. + auto holder = cg.async_gate().hold(); + co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction); + } +} + future<> table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) { bool do_trigger_compaction = offstrategy == sstables::offstrategy::no; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index d781b958b0..ce058b20e8 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -2466,6 +2466,9 @@ future topology_coordinator::maybe_start_tablet_split_finalization(group0_ if (plan.finalize_resize.empty()) { co_return false; } + if (utils::get_local_injector().enter("tablet_split_finalization_postpone")) { + co_return false; + } std::vector updates; diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 93bea32698..6a1698615b 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -447,6 +447,76 @@ async def test_tablet_repair(manager: ManagerClient): for r in rows: assert r.c == repair_cycles - 1 +# Reproducer for race between split and repair: https://github.com/scylladb/scylladb/issues/19378 +# Verifies repair will not complete with sstables that still require split, causing split +# execution to fail. +@pytest.mark.repair +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_concurrent_tablet_repair_and_split(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'raft_topology=debug', + '--target-tablet-size-in-bytes', '1024', + ] + servers = await manager.servers_add(3, cmdline=cmdline, config={ + 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] + }) + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " + "'replication_factor': 2} AND tablets = {'initial': 32};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + + logger.info("Populating table") + + keys = range(5000) # Enough keys to trigger repair digest mismatch with a high chance. + stmt = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?)") + stmt.consistency_level = ConsistencyLevel.ONE + + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s0_log = await manager.server_open_log(servers[0].server_id) + s0_mark = await s0_log.mark() + + await asyncio.gather(*[cql.run_async(stmt, [k, -1]) for k in keys]) + + # split decision is sstable size based, so data must be flushed first + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, "test") + + await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", False) + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + logger.info("Waiting for split prepare...") + await s0_log.wait_for('Setting split ready sequence number to', from_mark=s0_mark) + s0_mark = await s0_log.mark() + logger.info("Waited for split prepare") + + # Balancer is re-enabled later for split execution + await asyncio.create_task(manager.api.disable_tablet_balancing(servers[0].ip_addr)) + + # Write concurrently with repair to increase the chance of repair having some discrepancy to resolve and send writes. + inserts_future = asyncio.gather(*[cql.run_async(stmt, [k, 1]) for k in keys]) + + await repair_on_node(manager, servers[0], servers) + + await inserts_future + + logger.info("Waiting for split execute...") + await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone") + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + await s0_log.wait_for('Detected tablet split for table', from_mark=s0_mark) + await inject_error_one_shot_on(manager, "tablet_split_finalization_postpone", servers) + logger.info("Waited for split execute...") + + key_count = len(keys) + stmt = cql.prepare("SELECT * FROM test.test;") + stmt.consistency_level = ConsistencyLevel.ALL + rows = await cql.run_async(stmt) + assert len(rows) == key_count @pytest.mark.repair @pytest.mark.asyncio