From 3f8363300a283d83ee52d0a6915953ef643432e1 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 12 Dec 2025 12:47:46 -0300 Subject: [PATCH] replica: Introduce add_new_sstables_and_update_cache() Piggyback on new add_new_sstable_and_update_cache(), replacing the previous add_sstables_and_update_cache(). Will be used by intra-node migration since we want it to be safe when loading the cloned sstables. An unsplit sstable can be cloned into destination which already ACKed split, so we need this variant which splits sstable if needed, while it's unsealed. Signed-off-by: Raphael S. Carvalho --- replica/database.hh | 3 +++ replica/table.cc | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/replica/database.hh b/replica/database.hh index bf9ad11368..60b415b950 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -618,6 +618,9 @@ public: add_new_sstable_and_update_cache(sstables::shared_sstable new_sst, std::function(sstables::shared_sstable)> on_add, sstables::offstrategy offstrategy = sstables::offstrategy::no); + [[nodiscard]] future> + add_new_sstables_and_update_cache(std::vector new_ssts, + std::function(sstables::shared_sstable)> on_add); future<> move_sstables_from_staging(std::vector); sstables::shared_sstable make_sstable(); diff --git a/replica/table.cc b/replica/table.cc index 08d0016f38..af104928d0 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1448,6 +1448,36 @@ table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst, co_return std::move(ret); } +future> +table::add_new_sstables_and_update_cache(std::vector new_ssts, + std::function(sstables::shared_sstable)> on_add) { + std::exception_ptr ex; + std::vector ret; + + // We rely on add_new_sstable_and_update_cache() to unlink the sstable feeded into it, + // so the exception handling below will only have to unlink sstables not processed yet. + try { + for (auto& sst: new_ssts) { + auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add); + std::ranges::move(ssts, std::back_inserter(ret)); + + } + } catch (...) { + ex = std::current_exception(); + } + + if (ex) { + co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> { + if (sst) { + tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex); + co_await sst->unlink(); + } + }); + co_await coroutine::return_exception_ptr(std::move(ex)); + } + co_return std::move(ret); +} + future<> table::update_cache(compaction_group& cg, lw_shared_ptr m, std::vector ssts) { auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);