From e2023877f2e7f45ed4c64bd4d3b58f4048734123 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 16 Apr 2023 13:15:12 +0300 Subject: [PATCH 1/3] sstable_directory: parallel_for_each_restricted: do not move container Commit ecbd112979ba3dde8ec16203cc6d86a1293cedc7 `distributed_loader: reshard: consider sstables for cleanup` caused a regression in loading new sstables using the `upload` directory, as seen in e.g. https://jenkins.scylladb.com/view/master/job/scylla-master/job/dtest-daily-release/230/testReport/migration_test/TestMigration/Run_Dtest_Parallel_Cloud_Machines___FullDtest___full_split000___test_migrate_sstable_without_compression_3_0_md_/ ``` query = "SELECT COUNT(*) FROM cf" statement = SimpleStatement(query) s = self.patient_cql_connection(node, 'ks') result = list(s.execute(statement)) > assert result[0].count == expected_number_of_rows, \ "Expected {} rows. Got {}".format(expected_number_of_rows, list(s.execute("SELECT * FROM ks.cf"))) E AssertionError: Expected 1 rows. Got [] E assert 0 == 1 E +0 E -1 ``` The reason for the regression is that the call to `do_for_each_sstable` in `collect_all_shared_sstables` to search for sstables that need cleanup caused the list of sstables in the sstable directory to be moved and cleared. parallel_for_each_restricted moves the container passed to it into a `do_with` continuation. This is required for parallel_for_each_restricted. However, moving the container is destructive and so, the decision whether to move or not needs to be the caller's, not the callee. This patch changes the signature of parallel_for_each_restricted to accept a lvalue reference to the container rather than a rvalue reference, allowing the callers to decide whether to move or not. Most callers are converted to move the container, as they effectively do today, and a new method, `filter_sstables` was added for the `collect_all_shared_sstables` us case, that allows the `func` that processes each sstable to decide whether the sstable is kept in `_unshared_local_sstables` or not. Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 4 +++- sstables/sstable_directory.cc | 24 +++++++++++++++++++----- sstables/sstable_directory.hh | 10 +++++++++- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index a078cc6f5e..8ac57ed112 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -169,10 +169,12 @@ collect_all_shared_sstables(sharded& dir, sharded future<> { + co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future { if (table.update_sstable_cleanup_state(sst, *sorted_owned_ranges_ptr)) { need_cleanup.push_back(co_await sst->get_open_info()); + co_return false; } + co_return true; }); } if (shared_sstables.empty() && need_cleanup.empty()) { diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 23be387c4e..34a2fd493e 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -265,7 +265,8 @@ future<> sstable_directory::filesystem_components_lister::process(sstable_direct // _descriptors is everything with a TOC. So after we remove this, what's left is // SSTables for which a TOC was not found. - co_await directory.parallel_for_each_restricted(_state->descriptors, [this, flags, &directory] (std::pair& t) { + auto descriptors = std::move(_state->descriptors); + co_await directory.parallel_for_each_restricted(descriptors, [this, flags, &directory] (std::pair& t) { auto& desc = std::get<1>(t); _state->generations_found.erase(desc.generation); // This will try to pre-load this file and throw an exception if it is invalid @@ -339,7 +340,7 @@ future sstable_directory::load_foreign_sstable(foreign_sstable_o future<> sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) { - return parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) { + co_await parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) { return load_sstable(info).then([this] (auto sst) { _unshared_local_sstables.push_back(sst); return make_ready_future<>(); @@ -412,14 +413,27 @@ sstable_directory::remove_unshared_sstables(std::vector sstable_directory::do_for_each_sstable(std::function(sstables::shared_sstable)> func) { - return parallel_for_each_restricted(_unshared_local_sstables, std::move(func)); + auto sstables = std::move(_unshared_local_sstables); + co_await parallel_for_each_restricted(sstables, std::move(func)); +} + +future<> +sstable_directory::filter_sstables(std::function(sstables::shared_sstable)> func) { + std::vector filtered; + co_await parallel_for_each_restricted(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> { + auto keep = co_await func(sst); + if (keep) { + filtered.emplace_back(sst); + } + }); + _unshared_local_sstables = std::move(filtered); } template requires std::is_invocable_r_v, Func, typename std::decay_t::value_type&> future<> -sstable_directory::parallel_for_each_restricted(Container&& C, Func&& func) { - return do_with(std::move(C), std::move(func), [this] (Container& c, Func& func) mutable { +sstable_directory::parallel_for_each_restricted(Container& c, Func&& func) { + return do_with(std::move(func), [this, &c] (Func& func) mutable { return max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [this, &func] (auto& el) mutable { return with_semaphore(_manager.dir_semaphore()._sem, 1, [&func, el = std::move(el)] () mutable { return func(el); diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 097b3e2082..37eed8e35f 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -161,7 +161,8 @@ private: template requires std::is_invocable_r_v, Func, typename std::decay_t::value_type&> - future<> parallel_for_each_restricted(Container&& C, Func&& func); + future<> parallel_for_each_restricted(Container& C, Func&& func); + future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec); // Sort the sstable according to owner @@ -221,8 +222,15 @@ public: // Helper function that processes all unshared SSTables belonging to this shard, respecting the // concurrency limit. + // Note that this function is destructive, draining _shared_local_sstables. future<> do_for_each_sstable(std::function(sstables::shared_sstable)> func); + // Helper function that processes all unshared SSTables belonging to this shard, respecting the + // concurrency limit. + // sstables for which `func` returns a true value are kept in _shared_local_sstables, while + // those for which `func` returns false are erased from the list. + future<> filter_sstables(std::function(sstables::shared_sstable)> func); + // Retrieves the list of shared SSTables in this object. The list will be reset once this // is called. sstable_open_info_vector retrieve_shared_sstables(); From e4acc44814c59459784487a77ca85b2dd8a183f2 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 16 Apr 2023 13:24:07 +0300 Subject: [PATCH 2/3] sstable_directory: parallel_for_each_restricted: use std::ranges for template definition We'd like the container to be a std::ranges::range. Signed-off-by: Benny Halevy --- sstables/sstable_directory.cc | 4 ++-- sstables/sstable_directory.hh | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 34a2fd493e..8cc58d50e7 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -429,8 +429,8 @@ sstable_directory::filter_sstables(std::function(sstables::shared_s _unshared_local_sstables = std::move(filtered); } -template -requires std::is_invocable_r_v, Func, typename std::decay_t::value_type&> +template +requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> future<> sstable_directory::parallel_for_each_restricted(Container& c, Func&& func) { return do_with(std::move(func), [this, &c] (Func& func) mutable { diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 37eed8e35f..4f8081ab81 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -159,10 +159,9 @@ private: future load_sstable(sstables::entry_descriptor desc, sstables::sstable_open_config cfg = {}) const; future load_sstable(sstables::entry_descriptor desc, process_flags flags) const; - template - requires std::is_invocable_r_v, Func, typename std::decay_t::value_type&> + template + requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> future<> parallel_for_each_restricted(Container& C, Func&& func); - future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec); // Sort the sstable according to owner From 205daf49fd46288ed71a8fdf517b3a68c9931b98 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 16 Apr 2023 13:28:31 +0300 Subject: [PATCH 3/3] sstable_directory: coroutinize parallel_for_each_restricted Using a coroutine simplifies the function and reduced the number of moves it performs. Signed-off-by: Benny Halevy --- sstables/sstable_directory.cc | 11 ++++------- sstables/sstable_directory.hh | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 8cc58d50e7..205485a844 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -432,13 +432,10 @@ sstable_directory::filter_sstables(std::function(sstables::shared_s template requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> future<> -sstable_directory::parallel_for_each_restricted(Container& c, Func&& func) { - return do_with(std::move(func), [this, &c] (Func& func) mutable { - return max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [this, &func] (auto& el) mutable { - return with_semaphore(_manager.dir_semaphore()._sem, 1, [&func, el = std::move(el)] () mutable { - return func(el); - }); - }); +sstable_directory::parallel_for_each_restricted(Container& c, Func func) { + co_await max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [&] (auto& el) -> future<>{ + auto units = co_await get_units(_manager.dir_semaphore()._sem, 1); + co_await func(el); }); } diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 4f8081ab81..849a66fdd4 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -161,7 +161,7 @@ private: template requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> - future<> parallel_for_each_restricted(Container& C, Func&& func); + future<> parallel_for_each_restricted(Container& C, Func func); future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec); // Sort the sstable according to owner