future: limit number of ready futures that are executed without scheduling a task
Otherwise stack may overflow if a very long chain of ready futures is executed.
This commit is contained in:
@@ -260,6 +260,8 @@ struct ready_future_marker {};
|
||||
struct ready_future_from_tuple_marker {};
|
||||
struct exception_future_marker {};
|
||||
|
||||
extern __thread size_t future_avail_count;
|
||||
|
||||
template <typename... T>
|
||||
class future {
|
||||
promise<T...>* _promise;
|
||||
@@ -284,6 +286,24 @@ private:
|
||||
future_state<T...>* state() noexcept {
|
||||
return _promise ? _promise->_state : &_local_state;
|
||||
}
|
||||
template <typename Func>
|
||||
void schedule(Func&& func) noexcept {
|
||||
struct task_with_ready_state final : task {
|
||||
task_with_ready_state(Func&& func, future_state<T...>&& state) : _state(std::move(state)), _func(std::move(func)) {}
|
||||
virtual void run() noexcept override {
|
||||
_func(_state);
|
||||
}
|
||||
future_state<T...> _state;
|
||||
Func _func;
|
||||
};
|
||||
if (state()->available()) {
|
||||
::schedule(std::make_unique<task_with_ready_state>(std::move(func), std::move(*state())));
|
||||
} else {
|
||||
_promise->schedule(std::move(func));
|
||||
_promise->_future = nullptr;
|
||||
_promise = nullptr;
|
||||
}
|
||||
}
|
||||
public:
|
||||
using value_type = std::tuple<T...>;
|
||||
using promise_type = promise<T...>;
|
||||
@@ -328,7 +348,7 @@ public:
|
||||
template <typename Func>
|
||||
future<> then(Func&& func,
|
||||
std::enable_if_t<std::is_same<std::result_of_t<Func(T&&...)>, void>::value, void*> = nullptr) noexcept {
|
||||
if (state()->available()) {
|
||||
if (state()->available() && (++future_avail_count % 256)) {
|
||||
try {
|
||||
apply(std::move(func), std::move(state()->get()));
|
||||
return make_ready_future<>();
|
||||
@@ -338,7 +358,8 @@ public:
|
||||
}
|
||||
promise<> pr;
|
||||
auto fut = pr.get_future();
|
||||
_promise->schedule([pr = std::move(pr), func = std::forward<Func>(func)] (auto& state) mutable {
|
||||
|
||||
schedule([pr = std::move(pr), func = std::forward<Func>(func)] (auto& state) mutable {
|
||||
try {
|
||||
apply(std::move(func), state.get());
|
||||
pr.set_value();
|
||||
@@ -346,8 +367,6 @@ public:
|
||||
pr.set_exception(std::current_exception());
|
||||
}
|
||||
});
|
||||
_promise->_future = nullptr;
|
||||
_promise = nullptr;
|
||||
return fut;
|
||||
}
|
||||
|
||||
@@ -366,7 +385,7 @@ public:
|
||||
then(Func&& func,
|
||||
std::enable_if_t<is_future<std::result_of_t<Func(T&&...)>>::value, void*> = nullptr) noexcept {
|
||||
using P = typename std::result_of_t<Func(T&&...)>::promise_type;
|
||||
if (state()->available()) {
|
||||
if (state()->available() && (++future_avail_count % 256)) {
|
||||
try {
|
||||
return apply(std::move(func), std::move(state()->get()));
|
||||
} catch (...) {
|
||||
@@ -377,7 +396,7 @@ public:
|
||||
}
|
||||
P pr;
|
||||
auto next_fut = pr.get_future();
|
||||
_promise->schedule([func = std::forward<Func>(func), pr = std::move(pr)] (auto& state) mutable {
|
||||
schedule([func = std::forward<Func>(func), pr = std::move(pr)] (auto& state) mutable {
|
||||
try {
|
||||
auto result = state.get();
|
||||
auto next_fut = apply(std::move(func), std::move(result));
|
||||
@@ -386,8 +405,6 @@ public:
|
||||
pr.set_exception(std::current_exception());
|
||||
}
|
||||
});
|
||||
_promise->_future = nullptr;
|
||||
_promise = nullptr;
|
||||
return next_fut;
|
||||
}
|
||||
|
||||
|
||||
@@ -703,5 +703,6 @@ void smp::join_all()
|
||||
}
|
||||
}
|
||||
|
||||
__thread size_t future_avail_count = 0;
|
||||
|
||||
thread_local reactor engine;
|
||||
|
||||
Reference in New Issue
Block a user