diff --git a/replica/database.hh b/replica/database.hh index 42de4de9e9..598bdac116 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -617,6 +617,9 @@ public: add_new_sstable_and_update_cache(sstables::shared_sstable new_sst, std::function(sstables::shared_sstable)> on_add, sstables::offstrategy offstrategy = sstables::offstrategy::no); + [[nodiscard]] future> + add_new_sstables_and_update_cache(std::vector new_ssts, + std::function(sstables::shared_sstable)> on_add); future<> move_sstables_from_staging(std::vector); sstables::shared_sstable make_sstable(); diff --git a/replica/table.cc b/replica/table.cc index f203a2f9b5..c9cc732452 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1447,6 +1447,36 @@ table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst, co_return std::move(ret); } +future> +table::add_new_sstables_and_update_cache(std::vector new_ssts, + std::function(sstables::shared_sstable)> on_add) { + std::exception_ptr ex; + std::vector ret; + + // We rely on add_new_sstable_and_update_cache() to unlink the sstable feeded into it, + // so the exception handling below will only have to unlink sstables not processed yet. + try { + for (auto& sst: new_ssts) { + auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add); + std::ranges::move(ssts, std::back_inserter(ret)); + + } + } catch (...) { + ex = std::current_exception(); + } + + if (ex) { + co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> { + if (sst) { + tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex); + co_await sst->unlink(); + } + }); + co_await coroutine::return_exception_ptr(std::move(ex)); + } + co_return std::move(ret); +} + future<> table::update_cache(compaction_group& cg, lw_shared_ptr m, std::vector ssts) { auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);