Merge 'sstable_directory: parallel_for_each_restricted: do not move container' from Benny Halevy

Commit ecbd112979
`distributed_loader: reshard: consider sstables for cleanup`
caused a regression in loading new sstables using the `upload`
directory, as seen in e.g. https://jenkins.scylladb.com/view/master/job/scylla-master/job/dtest-daily-release/230/testReport/migration_test/TestMigration/Run_Dtest_Parallel_Cloud_Machines___FullDtest___full_split000___test_migrate_sstable_without_compression_3_0_md_/
```
            query = "SELECT COUNT(*) FROM cf"
            statement = SimpleStatement(query)
            s = self.patient_cql_connection(node, 'ks')
            result = list(s.execute(statement))
    >       assert result[0].count == expected_number_of_rows, \
                "Expected {} rows. Got {}".format(expected_number_of_rows, list(s.execute("SELECT *
FROM ks.cf")))
    E       AssertionError: Expected 1 rows. Got []
    E       assert 0 == 1
    E         +0
```

The reason for the regression is that the call to `do_for_each_sstable` in `collect_all_shared_sstables` to search for sstables that need cleanup caused the list of sstables in the sstable directory to be moved and cleared.

parallel_for_each_restricted moves the container passed to it into a `do_with` continuation.  This is required for parallel_for_each_restricted.

However, moving the container is destructive and so, the decision whether to move or not needs to be the caller's, not the callee.

This patch changes the signature of parallel_for_each_restricted to accept a container rather than a rvalue reference, allowing the callers to decide whether to move or not.

Most callers are converted to move the container, except for `do_for_each_sstable` that copies `_unshared_local_sstables`, allowing callers to call `dir.do_for_each_sstable` multiple times without moving the list contents.

Closes #13526

* github.com:scylladb/scylladb:
  sstable_directory: coroutinize parallel_for_each_restricted
  sstable_directory: parallel_for_each_restricted: use std::ranges for template definition
  sstable_directory: parallel_for_each_restricted: do not move container
This commit is contained in:
Avi Kivity
2023-05-04 17:39:05 +03:00
3 changed files with 36 additions and 16 deletions

View File

@@ -169,10 +169,12 @@ collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<r
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
if (sorted_owned_ranges_ptr) {
auto& table = db.local().find_column_family(ks_name, table_name);
co_await d.do_for_each_sstable([&] (sstables::shared_sstable sst) -> future<> {
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
if (table.update_sstable_cleanup_state(sst, *sorted_owned_ranges_ptr)) {
need_cleanup.push_back(co_await sst->get_open_info());
co_return false;
}
co_return true;
});
}
if (shared_sstables.empty() && need_cleanup.empty()) {

View File

@@ -265,7 +265,8 @@ future<> sstable_directory::filesystem_components_lister::process(sstable_direct
// _descriptors is everything with a TOC. So after we remove this, what's left is
// SSTables for which a TOC was not found.
co_await directory.parallel_for_each_restricted(_state->descriptors, [this, flags, &directory] (std::pair<const generation_type, sstables::entry_descriptor>& t) {
auto descriptors = std::move(_state->descriptors);
co_await directory.parallel_for_each_restricted(descriptors, [this, flags, &directory] (std::pair<const generation_type, sstables::entry_descriptor>& t) {
auto& desc = std::get<1>(t);
_state->generations_found.erase(desc.generation);
// This will try to pre-load this file and throw an exception if it is invalid
@@ -339,7 +340,7 @@ future<shared_sstable> sstable_directory::load_foreign_sstable(foreign_sstable_o
future<>
sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) {
return parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) {
co_await parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) {
return load_sstable(info).then([this] (auto sst) {
_unshared_local_sstables.push_back(sst);
return make_ready_future<>();
@@ -412,19 +413,29 @@ sstable_directory::remove_unshared_sstables(std::vector<sstables::shared_sstable
future<>
sstable_directory::do_for_each_sstable(std::function<future<>(sstables::shared_sstable)> func) {
return parallel_for_each_restricted(_unshared_local_sstables, std::move(func));
auto sstables = std::move(_unshared_local_sstables);
co_await parallel_for_each_restricted(sstables, std::move(func));
}
template <typename Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::decay_t<Container>::value_type&>
future<>
sstable_directory::parallel_for_each_restricted(Container&& C, Func&& func) {
return do_with(std::move(C), std::move(func), [this] (Container& c, Func& func) mutable {
return max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [this, &func] (auto& el) mutable {
return with_semaphore(_manager.dir_semaphore()._sem, 1, [&func, el = std::move(el)] () mutable {
return func(el);
});
});
sstable_directory::filter_sstables(std::function<future<bool>(sstables::shared_sstable)> func) {
std::vector<sstables::shared_sstable> filtered;
co_await parallel_for_each_restricted(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> {
auto keep = co_await func(sst);
if (keep) {
filtered.emplace_back(sst);
}
});
_unshared_local_sstables = std::move(filtered);
}
template <std::ranges::range Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::ranges::range_value_t<Container>&>
future<>
sstable_directory::parallel_for_each_restricted(Container& c, Func func) {
co_await max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [&] (auto& el) -> future<>{
auto units = co_await get_units(_manager.dir_semaphore()._sem, 1);
co_await func(el);
});
}

View File

@@ -159,9 +159,9 @@ private:
future<sstables::shared_sstable> load_sstable(sstables::entry_descriptor desc, sstables::sstable_open_config cfg = {}) const;
future<sstables::shared_sstable> load_sstable(sstables::entry_descriptor desc, process_flags flags) const;
template <typename Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::decay_t<Container>::value_type&>
future<> parallel_for_each_restricted(Container&& C, Func&& func);
template <std::ranges::range Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::ranges::range_value_t<Container>&>
future<> parallel_for_each_restricted(Container& C, Func func);
future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec);
// Sort the sstable according to owner
@@ -221,8 +221,15 @@ public:
// Helper function that processes all unshared SSTables belonging to this shard, respecting the
// concurrency limit.
// Note that this function is destructive, draining _shared_local_sstables.
future<> do_for_each_sstable(std::function<future<>(sstables::shared_sstable)> func);
// Helper function that processes all unshared SSTables belonging to this shard, respecting the
// concurrency limit.
// sstables for which `func` returns a true value are kept in _shared_local_sstables, while
// those for which `func` returns false are erased from the list.
future<> filter_sstables(std::function<future<bool>(sstables::shared_sstable)> func);
// Retrieves the list of shared SSTables in this object. The list will be reset once this
// is called.
sstable_open_info_vector retrieve_shared_sstables();