/* * Copyright 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include #include #include #include #include #include "utils/result.hh" namespace utils { namespace internal { template inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) { // Return 0 in general case return 0; } template inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) { // May require linear scan, but it's better than reallocation return std::distance(begin, end); } } // A version of parallel_for_each which understands results. // In case of a failure, it returns one of the failed results. template requires ExceptionContainerResult && std::is_void_v && requires (Func f, Iterator i) { { f(*i++) } -> std::same_as>; } inline seastar::future result_parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept { std::vector> futs; while (begin != end) { auto f = seastar::futurize_invoke(std::forward(func), *begin++); seastar::memory::scoped_critical_alloc_section _; if (f.available() && !f.failed()) { auto res = f.get(); if (res) { continue; } f = seastar::make_ready_future(std::move(res)); } if (futs.empty()) { using itraits = std::iterator_traits; auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1); futs.reserve(n); } futs.push_back(std::move(f)); } if (futs.empty()) { return seastar::make_ready_future(bo::success()); } // Use a coroutine so that the waiting task is allocated only once return ([] (std::vector> futs) noexcept -> seastar::future { using error_type = typename R::error_type; std::variant result_state; while (!futs.empty()) { // TODO: Avoid try..catching here if seastar coroutines allow that // Or not? Explicit checks might be slower on the happy path try { auto res = co_await std::move(futs.back()); if (!res) { result_state = std::move(res).assume_error(); } } catch (...) { result_state = std::current_exception(); } futs.pop_back(); } if (std::holds_alternative(result_state)) { co_return bo::success(); } else if (auto* error = std::get_if(&result_state)) { co_return std::move(*error); } else { co_return seastar::coroutine::exception( std::get(std::move(result_state))); } })(std::move(futs)); } // A version of parallel_for_each which understands results. // In case of a failure, it returns one of the failed results. template requires ExceptionContainerResult && std::is_void_v inline seastar::future result_parallel_for_each(Range&& range, Func&& func) { return result_parallel_for_each(std::begin(range), std::end(range), std::forward(func)); } namespace internal { template struct result_reducer_traits { using result_type = bo::result; static seastar::future maybe_call_get(Reducer&& r) { return seastar::make_ready_future(bo::success()); } }; template requires requires (Reducer r) { { r.get() }; } struct result_reducer_traits { using original_type = seastar::futurize_t().get())>; using result_type = bo::result; static seastar::future maybe_call_get(Reducer&& r) { auto x = r.get(); if constexpr (seastar::Future) { return x.then([] (auto&& v) { return seastar::make_ready_future(bo::success(std::move(v))); }); } else { return seastar::make_ready_future(bo::success(std::move(x))); } } }; template struct result_map_reduce_unary_adapter { private: // We could in theory just use result here and turn it into // a failed result on receiving an error, however that would destroy // the Reducer and some map_reduce usages may assume that the reducer // is alive until all mapper calls finish (e.g. it may hold a smart pointer // to some memory used by mappers), therefore it is safer to keep it // separate from the error and only destroy it when the wrapper // is destroyed. Reducer _reducer; ExCont _excont; using reducer_traits = result_reducer_traits; public: result_map_reduce_unary_adapter(Reducer&& reducer) : _reducer(std::forward(reducer)) { } template seastar::future<> operator()(Arg&& arg) { if (!_excont && arg) { return seastar::futurize_invoke(_reducer, std::move(arg).assume_value()); } if (_excont) { // We already got an error before, so ignore the new one return seastar::make_ready_future<>(); } // `arg` must be a failed result _excont = std::move(arg).assume_error(); return seastar::make_ready_future<>(); } seastar::future get() { if (_excont) { return seastar::make_ready_future(bo::failure(std::move(_excont))); } return reducer_traits::maybe_call_get(std::move(_reducer)); } }; } template requires requires (Iterator i, Mapper mapper, Reducer reduce) { *i++; { i != i } -> std::convertible_to; { mapper(*i) } -> ExceptionContainerResultFuture<>; { seastar::futurize_invoke(reduce, seastar::futurize_invoke(mapper, *i).get().value()) } -> std::same_as>; } inline auto result_map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& reducer) { using result_type = std::remove_reference_t; using exception_container_type = typename result_type::error_type; using adapter_type = internal::result_map_reduce_unary_adapter; return seastar::map_reduce( std::move(begin), std::move(end), std::forward(mapper), adapter_type(std::forward(reducer))); } namespace internal { template requires ResultRebindableTo struct result_map_reduce_binary_adapter { private: using left_value_type = typename Left::value_type; using right_value_type = typename Right::value_type; using return_type = std::invoke_result; Reducer _reducer; public: result_map_reduce_binary_adapter(Reducer&& reducer) : _reducer(std::forward(reducer)) { } Left operator()(Left&& left, Right&& right) { if (!left) { return std::move(left); } if (!right) { return std::move(right).as_failure(); } return _reducer(std::move(left).assume_value(), std::move(right).assume_value()); } }; } template requires requires (Iterator i, Mapper mapper, Initial initial, Reducer reduce) { *i++; { i != i } -> std::convertible_to; { mapper(*i) } -> ExceptionContainerResultFuture<>; { reduce(std::move(initial), mapper(*i).get().value()) } -> std::convertible_to>>; } inline auto result_map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reducer reduce) -> seastar::future>> { using right_type = std::remove_reference_t; using left_type = rebind_result; return seastar::map_reduce( std::move(begin), std::move(end), std::forward(mapper), left_type(std::move(initial)), internal::result_map_reduce_binary_adapter(std::move(reduce))); } template requires requires (StopCondition cond, AsyncAction act) { { cond() } -> std::same_as; { seastar::futurize_invoke(act) } -> ExceptionContainerResultFuture<>; } inline auto result_do_until(StopCondition stop_cond, AsyncAction action) { // TODO: Constrain the result of act() better using future_type = seastar::futurize_t>; using result_type = typename future_type::value_type; static_assert(std::is_void_v, "The result type of the action must be future>"); for (;;) { try { if (stop_cond()) { return seastar::make_ready_future(bo::success()); } } catch (...) { return seastar::current_exception_as_future(); } auto f = seastar::futurize_invoke(action); if (f.available() && !seastar::need_preempt()) { if (f.failed()) { return f; } result_type&& res = f.get(); if (!res) { return seastar::make_ready_future(std::move(res)); } } else { return ([] (future_type f, StopCondition stop_cond, AsyncAction action) -> seastar::future { for (;;) { // No need to manually maybe_yield because co_await does that for us result_type res = co_await std::move(f); if (!res) { co_return res; } if (stop_cond()) { co_return bo::success(); } f = seastar::futurize_invoke(action); } })(std::move(f), std::move(stop_cond), std::move(action)); } } } template requires requires (AsyncAction act) { { seastar::futurize_invoke(act) } -> ExceptionContainerResultFuture<>; } inline auto result_repeat(AsyncAction&& action) noexcept { using future_type = seastar::futurize_t>; using result_type = typename future_type::value_type; static_assert(std::is_same_v, "bad AsyncAction signature"); using return_result_type = rebind_result; for (;;) { auto f = seastar::futurize_invoke(action); if (f.available() && !seastar::need_preempt()) { if (f.failed()) { return seastar::make_exception_future(f.get_exception()); } result_type&& res = f.get(); if (!res) { return seastar::make_ready_future(std::move(res).as_failure()); } if (res.value() == seastar::stop_iteration::yes) { return seastar::make_ready_future(bo::success()); } } else { return ([] (future_type f, AsyncAction action) -> seastar::future { for (;;) { // No need to manually maybe_yield because co_await does that for us auto&& res = co_await std::move(f); if (!res) { co_return std::move(res).as_failure(); } if (res.value() == seastar::stop_iteration::yes) { co_return bo::success(); } f = seastar::futurize_invoke(action); } })(std::move(f), std::move(action)); } } } }