Files
scylladb/utils/result_loop.hh
Piotr Dulikowski dd3284ec38 utils/result: optimize result_parallel_for_each
It now resembles the original parallel_for_each more, but uses a
coroutine instead of a custom `task` to collect not-ready futures.
Although the usage of a coroutine saves on allocations, the drawback is
that there is currently no way to co_await on a future and handle its
exception without throwing or without unconditionally allocating a
then_wrapped or handle_exception continuation - so it introduces a
rethrow.

Furthermore, now failed results and exceptions are treated as equals.
Previously, in case one parallel invocation returned failed result and
another returned an exception, the exception would always be returned.
Now, the failed result/exception of the invocation with the lowest index
is always preferred, regardless of the failure type.

The reimplementation manages to save about 350-400 instructions, one
task and one allocation in the perf_simple_query benchmark in write
mode.

Results from `perf_simple_query --smp 1 --operations-per-shard 1000000
--write` (before vs. after):

```
126872.54 tps ( 67.2 allocs/op,  14.2 tasks/op,   52404 insns/op)
126532.13 tps ( 67.2 allocs/op,  14.2 tasks/op,   52408 insns/op)
126864.99 tps ( 67.2 allocs/op,  14.2 tasks/op,   52428 insns/op)
127073.10 tps ( 67.2 allocs/op,  14.2 tasks/op,   52404 insns/op)
126895.85 tps ( 67.2 allocs/op,  14.2 tasks/op,   52411 insns/op)

127894.02 tps ( 66.2 allocs/op,  13.2 tasks/op,   52036 insns/op)
127671.51 tps ( 66.2 allocs/op,  13.2 tasks/op,   52042 insns/op)
127541.42 tps ( 66.2 allocs/op,  13.2 tasks/op,   52044 insns/op)
127409.10 tps ( 66.2 allocs/op,  13.2 tasks/op,   52052 insns/op)
127831.30 tps ( 66.2 allocs/op,  13.2 tasks/op,   52043 insns/op)
```

Test: unit(dev), unit(result_utils_test, debug)
2022-02-10 18:19:08 +01:00

104 lines
3.6 KiB
C++

/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <iterator>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/exception.hh>
#include "utils/result.hh"
namespace utils {
namespace internal {
template<typename Iterator, typename IteratorCategory>
inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
// Return 0 in general case
return 0;
}
template<typename Iterator, typename IteratorCategory>
inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
// May require linear scan, but it's better than reallocation
return std::distance(begin, end);
}
}
// A version of parallel_for_each which understands results.
// In case of a failure, it returns one of the failed results.
template<typename R, typename Iterator, typename Func>
requires
ExceptionContainerResult<R>
&& std::is_void_v<typename R::value_type>
&& requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<seastar::future<R>>; }
inline seastar::future<R> result_parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
std::vector<seastar::future<R>> futs;
while (begin != end) {
auto f = seastar::futurize_invoke(std::forward<Func>(func), *begin++);
seastar::memory::scoped_critical_alloc_section _;
if (f.available() && !f.failed()) {
auto res = f.get();
if (res) {
continue;
}
f = seastar::make_ready_future<R>(std::move(res));
}
if (futs.empty()) {
using itraits = std::iterator_traits<Iterator>;
auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
futs.reserve(n);
}
futs.push_back(std::move(f));
}
if (futs.empty()) {
return seastar::make_ready_future<R>(bo::success());
}
// Use a coroutine so that the waiting task is allocated only once
return ([] (std::vector<seastar::future<R>> futs) noexcept -> seastar::future<R> {
using error_type = typename R::error_type;
std::variant<std::monostate, error_type, std::exception_ptr> result_state;
while (!futs.empty()) {
// TODO: Avoid try..catching here if seastar coroutines allow that
// Or not? Explicit checks might be slower on the happy path
try {
auto res = co_await std::move(futs.back());
if (!res) {
result_state = std::move(res).assume_error();
}
} catch (...) {
result_state = std::current_exception();
}
futs.pop_back();
}
if (std::holds_alternative<std::monostate>(result_state)) {
co_return bo::success();
} else if (auto* error = std::get_if<error_type>(&result_state)) {
co_return std::move(*error);
} else {
co_return seastar::coroutine::exception(
std::get<std::exception_ptr>(std::move(result_state)));
}
})(std::move(futs));
}
// A version of parallel_for_each which understands results.
// In case of a failure, it returns one of the failed results.
template<typename R, typename Range, typename Func>
requires
ExceptionContainerResult<R>
&& std::is_void_v<typename R::value_type>
inline seastar::future<R> result_parallel_for_each(Range&& range, Func&& func) {
return result_parallel_for_each<R>(std::begin(range), std::end(range), std::forward<Func>(func));
}
}