From 2f426455699d42322232ed50f6907ddac4fa3867 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 12 Dec 2025 12:47:46 -0300 Subject: [PATCH] replica: Introduce add_new_sstables_and_update_cache() Piggyback on new add_new_sstable_and_update_cache(), replacing the previous add_sstables_and_update_cache(). Will be used by intra-node migration since we want it to be safe when loading the cloned sstables. An unsplit sstable can be cloned into destination which already ACKed split, so we need this variant which splits sstable if needed, while it's unsealed. Signed-off-by: Raphael S. Carvalho (cherry picked from commit 3f8363300a283d83ee52d0a6915953ef643432e1) --- replica/database.hh | 3 +++ replica/table.cc | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/replica/database.hh b/replica/database.hh index 42de4de9e9..598bdac116 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -617,6 +617,9 @@ public: add_new_sstable_and_update_cache(sstables::shared_sstable new_sst, std::function(sstables::shared_sstable)> on_add, sstables::offstrategy offstrategy = sstables::offstrategy::no); + [[nodiscard]] future> + add_new_sstables_and_update_cache(std::vector new_ssts, + std::function(sstables::shared_sstable)> on_add); future<> move_sstables_from_staging(std::vector); sstables::shared_sstable make_sstable(); diff --git a/replica/table.cc b/replica/table.cc index f203a2f9b5..c9cc732452 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1447,6 +1447,36 @@ table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst, co_return std::move(ret); } +future> +table::add_new_sstables_and_update_cache(std::vector new_ssts, + std::function(sstables::shared_sstable)> on_add) { + std::exception_ptr ex; + std::vector ret; + + // We rely on add_new_sstable_and_update_cache() to unlink the sstable feeded into it, + // so the exception handling below will only have to unlink sstables not processed yet. + try { + for (auto& sst: new_ssts) { + auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add); + std::ranges::move(ssts, std::back_inserter(ret)); + + } + } catch (...) { + ex = std::current_exception(); + } + + if (ex) { + co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> { + if (sst) { + tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex); + co_await sst->unlink(); + } + }); + co_await coroutine::return_exception_ptr(std::move(ex)); + } + co_return std::move(ret); +} + future<> table::update_cache(compaction_group& cg, lw_shared_ptr m, std::vector ssts) { auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);