Files
scylladb/utils/result_loop.hh
Kefu Chai 7215d4bfe9 utils: do not include unused headers
these unused includes were identifier by clang-include-cleaner. after
auditing these source files, all of the reports have been confirmed.

please note, because quite a few source files relied on
`utils/to_string.hh` to pull in the specialization of
`fmt::formatter<std::optional<T>>`, after removing
`#include <fmt/std.h>` from `utils/to_string.hh`, we have to
include `fmt/std.h` directly.

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
2025-01-14 07:56:39 -05:00

347 lines
13 KiB
C++

/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <iterator>
#include <seastar/core/coroutine.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/map_reduce.hh>
#include <seastar/coroutine/exception.hh>
#include <type_traits>
#include "utils/result.hh"
namespace utils {
namespace internal {
template<typename Iterator, typename IteratorCategory>
inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
// Return 0 in general case
return 0;
}
template<typename Iterator, typename IteratorCategory>
inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
// May require linear scan, but it's better than reallocation
return std::distance(begin, end);
}
}
// A version of parallel_for_each which understands results.
// In case of a failure, it returns one of the failed results.
template<typename R, typename Iterator, typename Func>
requires
ExceptionContainerResult<R>
&& std::is_void_v<typename R::value_type>
&& requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<seastar::future<R>>; }
inline seastar::future<R> result_parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
std::vector<seastar::future<R>> futs;
while (begin != end) {
auto f = seastar::futurize_invoke(std::forward<Func>(func), *begin++);
seastar::memory::scoped_critical_alloc_section _;
if (f.available() && !f.failed()) {
auto res = f.get();
if (res) {
continue;
}
f = seastar::make_ready_future<R>(std::move(res));
}
if (futs.empty()) {
using itraits = std::iterator_traits<Iterator>;
auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
futs.reserve(n);
}
futs.push_back(std::move(f));
}
if (futs.empty()) {
return seastar::make_ready_future<R>(bo::success());
}
// Use a coroutine so that the waiting task is allocated only once
return ([] (std::vector<seastar::future<R>> futs) noexcept -> seastar::future<R> {
using error_type = typename R::error_type;
std::variant<std::monostate, error_type, std::exception_ptr> result_state;
while (!futs.empty()) {
// TODO: Avoid try..catching here if seastar coroutines allow that
// Or not? Explicit checks might be slower on the happy path
try {
auto res = co_await std::move(futs.back());
if (!res) {
result_state = std::move(res).assume_error();
}
} catch (...) {
result_state = std::current_exception();
}
futs.pop_back();
}
if (std::holds_alternative<std::monostate>(result_state)) {
co_return bo::success();
} else if (auto* error = std::get_if<error_type>(&result_state)) {
co_return std::move(*error);
} else {
co_return seastar::coroutine::exception(
std::get<std::exception_ptr>(std::move(result_state)));
}
})(std::move(futs));
}
// A version of parallel_for_each which understands results.
// In case of a failure, it returns one of the failed results.
template<typename R, typename Range, typename Func>
requires
ExceptionContainerResult<R>
&& std::is_void_v<typename R::value_type>
inline seastar::future<R> result_parallel_for_each(Range&& range, Func&& func) {
return result_parallel_for_each<R>(std::begin(range), std::end(range), std::forward<Func>(func));
}
namespace internal {
template<typename Reducer, ExceptionContainer ExCont>
struct result_reducer_traits {
using result_type = bo::result<void, ExCont, exception_container_throw_policy>;
static seastar::future<result_type> maybe_call_get(Reducer&& r) {
return seastar::make_ready_future<result_type>(bo::success());
}
};
template<typename Reducer, ExceptionContainer ExCont>
requires requires (Reducer r) {
{ r.get() };
}
struct result_reducer_traits<Reducer, ExCont> {
using original_type = seastar::futurize_t<decltype(std::declval<Reducer>().get())>;
using result_type = bo::result<typename original_type::value_type, ExCont, exception_container_throw_policy>;
static seastar::future<result_type> maybe_call_get(Reducer&& r) {
auto x = r.get();
if constexpr (seastar::Future<decltype(x)>) {
return x.then([] (auto&& v) {
return seastar::make_ready_future<result_type>(bo::success(std::move(v)));
});
} else {
return seastar::make_ready_future<result_type>(bo::success(std::move(x)));
}
}
};
template<typename Reducer, ExceptionContainer ExCont>
struct result_map_reduce_unary_adapter {
private:
// We could in theory just use result<Reducer> here and turn it into
// a failed result on receiving an error, however that would destroy
// the Reducer and some map_reduce usages may assume that the reducer
// is alive until all mapper calls finish (e.g. it may hold a smart pointer
// to some memory used by mappers), therefore it is safer to keep it
// separate from the error and only destroy it when the wrapper
// is destroyed.
Reducer _reducer;
ExCont _excont;
using reducer_traits = result_reducer_traits<Reducer, ExCont>;
public:
result_map_reduce_unary_adapter(Reducer&& reducer)
: _reducer(std::forward<Reducer>(reducer))
{ }
template<ExceptionContainerResult Arg>
seastar::future<> operator()(Arg&& arg) {
if (!_excont && arg) {
return seastar::futurize_invoke(_reducer, std::move(arg).assume_value());
}
if (_excont) {
// We already got an error before, so ignore the new one
return seastar::make_ready_future<>();
}
// `arg` must be a failed result
_excont = std::move(arg).assume_error();
return seastar::make_ready_future<>();
}
seastar::future<typename reducer_traits::result_type> get() {
if (_excont) {
return seastar::make_ready_future<typename reducer_traits::result_type>(bo::failure(std::move(_excont)));
}
return reducer_traits::maybe_call_get(std::move(_reducer));
}
};
}
template<typename Iterator, typename Mapper, typename Reducer>
requires requires (Iterator i, Mapper mapper, Reducer reduce) {
*i++;
{ i != i } -> std::convertible_to<bool>;
{ mapper(*i) } -> ExceptionContainerResultFuture<>;
{ seastar::futurize_invoke(reduce, seastar::futurize_invoke(mapper, *i).get().value()) }
-> std::same_as<seastar::future<>>;
}
inline
auto
result_map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& reducer) {
using result_type = std::remove_reference_t<decltype(seastar::futurize_invoke(mapper, *begin).get())>;
using exception_container_type = typename result_type::error_type;
using adapter_type = internal::result_map_reduce_unary_adapter<Reducer, exception_container_type>;
return seastar::map_reduce(
std::move(begin),
std::move(end),
std::forward<Mapper>(mapper),
adapter_type(std::forward<Reducer>(reducer)));
}
namespace internal {
template<ExceptionContainerResult Left, ExceptionContainerResult Right, typename Reducer>
requires ResultRebindableTo<Right, Left>
struct result_map_reduce_binary_adapter {
private:
using left_value_type = typename Left::value_type;
using right_value_type = typename Right::value_type;
using return_type = std::invoke_result<Reducer, left_value_type&&, right_value_type&&>;
Reducer _reducer;
public:
result_map_reduce_binary_adapter(Reducer&& reducer)
: _reducer(std::forward<Reducer>(reducer))
{ }
Left operator()(Left&& left, Right&& right) {
if (!left) {
return std::move(left);
}
if (!right) {
return std::move(right).as_failure();
}
return _reducer(std::move(left).assume_value(), std::move(right).assume_value());
}
};
}
template<typename Iterator, typename Mapper, typename Initial, typename Reducer>
requires requires (Iterator i, Mapper mapper, Initial initial, Reducer reduce) {
*i++;
{ i != i } -> std::convertible_to<bool>;
{ mapper(*i) } -> ExceptionContainerResultFuture<>;
{ reduce(std::move(initial), mapper(*i).get().value()) }
-> std::convertible_to<rebind_result<Initial, std::remove_reference_t<decltype(mapper(*i).get())>>>;
}
inline
auto
result_map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reducer reduce)
-> seastar::future<rebind_result<Initial, std::remove_reference_t<decltype(mapper(*begin).get())>>> {
using right_type = std::remove_reference_t<decltype(mapper(*begin).get())>;
using left_type = rebind_result<Initial, right_type>;
return seastar::map_reduce(
std::move(begin),
std::move(end),
std::forward<Mapper>(mapper),
left_type(std::move(initial)),
internal::result_map_reduce_binary_adapter<left_type, right_type, Reducer>(std::move(reduce)));
}
template<typename AsyncAction, typename StopCondition>
requires requires (StopCondition cond, AsyncAction act) {
{ cond() } -> std::same_as<bool>;
{ seastar::futurize_invoke(act) } -> ExceptionContainerResultFuture<>;
}
inline
auto
result_do_until(StopCondition stop_cond, AsyncAction action) {
// TODO: Constrain the result of act() better
using future_type = seastar::futurize_t<std::invoke_result_t<AsyncAction>>;
using result_type = typename future_type::value_type;
static_assert(std::is_void_v<typename result_type::value_type>,
"The result type of the action must be future<result<void>>");
for (;;) {
try {
if (stop_cond()) {
return seastar::make_ready_future<result_type>(bo::success());
}
} catch (...) {
return seastar::current_exception_as_future<result_type>();
}
auto f = seastar::futurize_invoke(action);
if (f.available() && !seastar::need_preempt()) {
if (f.failed()) {
return f;
}
result_type&& res = f.get();
if (!res) {
return seastar::make_ready_future<result_type>(std::move(res));
}
} else {
return ([] (future_type f, StopCondition stop_cond, AsyncAction action) -> seastar::future<result_type> {
for (;;) {
// No need to manually maybe_yield because co_await does that for us
result_type res = co_await std::move(f);
if (!res) {
co_return res;
}
if (stop_cond()) {
co_return bo::success();
}
f = seastar::futurize_invoke(action);
}
})(std::move(f), std::move(stop_cond), std::move(action));
}
}
}
template<typename AsyncAction>
requires requires (AsyncAction act) {
{ seastar::futurize_invoke(act) } -> ExceptionContainerResultFuture<>;
}
inline
auto result_repeat(AsyncAction&& action) noexcept {
using future_type = seastar::futurize_t<std::invoke_result_t<AsyncAction>>;
using result_type = typename future_type::value_type;
static_assert(std::is_same_v<seastar::stop_iteration, typename result_type::value_type>, "bad AsyncAction signature");
using return_result_type = rebind_result<void, result_type>;
for (;;) {
auto f = seastar::futurize_invoke(action);
if (f.available() && !seastar::need_preempt()) {
if (f.failed()) {
return seastar::make_exception_future<return_result_type>(f.get_exception());
}
result_type&& res = f.get();
if (!res) {
return seastar::make_ready_future<return_result_type>(std::move(res).as_failure());
}
if (res.value() == seastar::stop_iteration::yes) {
return seastar::make_ready_future<return_result_type>(bo::success());
}
} else {
return ([] (future_type f, AsyncAction action) -> seastar::future<return_result_type> {
for (;;) {
// No need to manually maybe_yield because co_await does that for us
auto&& res = co_await std::move(f);
if (!res) {
co_return std::move(res).as_failure();
}
if (res.value() == seastar::stop_iteration::yes) {
co_return bo::success();
}
f = seastar::futurize_invoke(action);
}
})(std::move(f), std::move(action));
}
}
}
}