diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index a078cc6f5e..8ac57ed112 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -169,10 +169,12 @@ collect_all_shared_sstables(sharded& dir, sharded future<> { + co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future { if (table.update_sstable_cleanup_state(sst, *sorted_owned_ranges_ptr)) { need_cleanup.push_back(co_await sst->get_open_info()); + co_return false; } + co_return true; }); } if (shared_sstables.empty() && need_cleanup.empty()) { diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 23be387c4e..205485a844 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -265,7 +265,8 @@ future<> sstable_directory::filesystem_components_lister::process(sstable_direct // _descriptors is everything with a TOC. So after we remove this, what's left is // SSTables for which a TOC was not found. - co_await directory.parallel_for_each_restricted(_state->descriptors, [this, flags, &directory] (std::pair& t) { + auto descriptors = std::move(_state->descriptors); + co_await directory.parallel_for_each_restricted(descriptors, [this, flags, &directory] (std::pair& t) { auto& desc = std::get<1>(t); _state->generations_found.erase(desc.generation); // This will try to pre-load this file and throw an exception if it is invalid @@ -339,7 +340,7 @@ future sstable_directory::load_foreign_sstable(foreign_sstable_o future<> sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) { - return parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) { + co_await parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) { return load_sstable(info).then([this] (auto sst) { _unshared_local_sstables.push_back(sst); return make_ready_future<>(); @@ -412,19 +413,29 @@ sstable_directory::remove_unshared_sstables(std::vector sstable_directory::do_for_each_sstable(std::function(sstables::shared_sstable)> func) { - return parallel_for_each_restricted(_unshared_local_sstables, std::move(func)); + auto sstables = std::move(_unshared_local_sstables); + co_await parallel_for_each_restricted(sstables, std::move(func)); } -template -requires std::is_invocable_r_v, Func, typename std::decay_t::value_type&> future<> -sstable_directory::parallel_for_each_restricted(Container&& C, Func&& func) { - return do_with(std::move(C), std::move(func), [this] (Container& c, Func& func) mutable { - return max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [this, &func] (auto& el) mutable { - return with_semaphore(_manager.dir_semaphore()._sem, 1, [&func, el = std::move(el)] () mutable { - return func(el); - }); - }); +sstable_directory::filter_sstables(std::function(sstables::shared_sstable)> func) { + std::vector filtered; + co_await parallel_for_each_restricted(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> { + auto keep = co_await func(sst); + if (keep) { + filtered.emplace_back(sst); + } + }); + _unshared_local_sstables = std::move(filtered); +} + +template +requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> +future<> +sstable_directory::parallel_for_each_restricted(Container& c, Func func) { + co_await max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [&] (auto& el) -> future<>{ + auto units = co_await get_units(_manager.dir_semaphore()._sem, 1); + co_await func(el); }); } diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 097b3e2082..849a66fdd4 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -159,9 +159,9 @@ private: future load_sstable(sstables::entry_descriptor desc, sstables::sstable_open_config cfg = {}) const; future load_sstable(sstables::entry_descriptor desc, process_flags flags) const; - template - requires std::is_invocable_r_v, Func, typename std::decay_t::value_type&> - future<> parallel_for_each_restricted(Container&& C, Func&& func); + template + requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> + future<> parallel_for_each_restricted(Container& C, Func func); future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec); // Sort the sstable according to owner @@ -221,8 +221,15 @@ public: // Helper function that processes all unshared SSTables belonging to this shard, respecting the // concurrency limit. + // Note that this function is destructive, draining _shared_local_sstables. future<> do_for_each_sstable(std::function(sstables::shared_sstable)> func); + // Helper function that processes all unshared SSTables belonging to this shard, respecting the + // concurrency limit. + // sstables for which `func` returns a true value are kept in _shared_local_sstables, while + // those for which `func` returns false are erased from the list. + future<> filter_sstables(std::function(sstables::shared_sstable)> func); + // Retrieves the list of shared SSTables in this object. The list will be reset once this // is called. sstable_open_info_vector retrieve_shared_sstables();