diff --git a/core/future.hh b/core/future.hh index b4d79e5a70..1d124987cd 100644 --- a/core/future.hh +++ b/core/future.hh @@ -260,6 +260,8 @@ struct ready_future_marker {}; struct ready_future_from_tuple_marker {}; struct exception_future_marker {}; +extern __thread size_t future_avail_count; + template class future { promise* _promise; @@ -284,6 +286,24 @@ private: future_state* state() noexcept { return _promise ? _promise->_state : &_local_state; } + template + void schedule(Func&& func) noexcept { + struct task_with_ready_state final : task { + task_with_ready_state(Func&& func, future_state&& state) : _state(std::move(state)), _func(std::move(func)) {} + virtual void run() noexcept override { + _func(_state); + } + future_state _state; + Func _func; + }; + if (state()->available()) { + ::schedule(std::make_unique(std::move(func), std::move(*state()))); + } else { + _promise->schedule(std::move(func)); + _promise->_future = nullptr; + _promise = nullptr; + } + } public: using value_type = std::tuple; using promise_type = promise; @@ -328,7 +348,7 @@ public: template future<> then(Func&& func, std::enable_if_t, void>::value, void*> = nullptr) noexcept { - if (state()->available()) { + if (state()->available() && (++future_avail_count % 256)) { try { apply(std::move(func), std::move(state()->get())); return make_ready_future<>(); @@ -338,7 +358,8 @@ public: } promise<> pr; auto fut = pr.get_future(); - _promise->schedule([pr = std::move(pr), func = std::forward(func)] (auto& state) mutable { + + schedule([pr = std::move(pr), func = std::forward(func)] (auto& state) mutable { try { apply(std::move(func), state.get()); pr.set_value(); @@ -346,8 +367,6 @@ public: pr.set_exception(std::current_exception()); } }); - _promise->_future = nullptr; - _promise = nullptr; return fut; } @@ -366,7 +385,7 @@ public: then(Func&& func, std::enable_if_t>::value, void*> = nullptr) noexcept { using P = typename std::result_of_t::promise_type; - if (state()->available()) { + if (state()->available() && (++future_avail_count % 256)) { try { return apply(std::move(func), std::move(state()->get())); } catch (...) { @@ -377,7 +396,7 @@ public: } P pr; auto next_fut = pr.get_future(); - _promise->schedule([func = std::forward(func), pr = std::move(pr)] (auto& state) mutable { + schedule([func = std::forward(func), pr = std::move(pr)] (auto& state) mutable { try { auto result = state.get(); auto next_fut = apply(std::move(func), std::move(result)); @@ -386,8 +405,6 @@ public: pr.set_exception(std::current_exception()); } }); - _promise->_future = nullptr; - _promise = nullptr; return next_fut; } diff --git a/core/reactor.cc b/core/reactor.cc index 92daa222dd..bb730a5bc5 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -703,5 +703,6 @@ void smp::join_all() } } +__thread size_t future_avail_count = 0; thread_local reactor engine;