diff --git a/core/future-util.hh b/core/future-util.hh index c094b2d916..2958360ada 100644 --- a/core/future-util.hh +++ b/core/future-util.hh @@ -70,13 +70,28 @@ future<> do_until(StopCondition&& stop_cond, AsyncAction&& action) { template static inline future<> keep_doing(AsyncAction&& action) { - try { - return action().then([action = std::forward(action)] () mutable { - return keep_doing(std::forward(action)); - }); - } catch (...) { - return make_exception_future(std::current_exception()); + while (task_quota) { + auto f = action(); + + if (!f.available()) { + return f.then([action = std::forward(action)] () mutable { + return keep_doing(std::forward(action)); + }); + } + + if (f.failed()) { + return std::move(f); + } + + --task_quota; } + + promise<> p; + auto f = p.get_future(); + schedule(make_task([action = std::forward(action), p = std::move(p)] () mutable { + keep_doing(std::forward(action)).forward_to(std::move(p)); + })); + return f; } template diff --git a/core/reactor.cc b/core/reactor.cc index 203eee726e..c4e73ef066 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -74,6 +74,7 @@ void reactor::configure(boost::program_options::variables_map vm) { ? network_stack_registry::create(sstring(vm["network-stack"].as()), vm) : network_stack_registry::create(vm); _handle_sigint = !vm.count("no-handle-interrupt"); + _task_quota = vm["task-quota"].as(); } future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd, @@ -420,8 +421,9 @@ int reactor::run() { }); complete_timers(); while (true) { - unsigned loop = 0; - while (!_pending_tasks.empty() && loop++ < 200) { + task_quota = _task_quota; + while (!_pending_tasks.empty() && task_quota) { + --task_quota; auto tsk = std::move(_pending_tasks.front()); _pending_tasks.pop_front(); tsk->run(); @@ -731,6 +733,7 @@ reactor::get_options_description() { sprint("select network stack (valid values: %s)", format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")).c_str()) ("no-handle-interrupt", "ignore SIGINT (for gdb)") + ("task-quota", bpo::value()->default_value(200), "Max number of tasks executed between polls and in loops") ; opts.add(network_stack_registry::options_description()); return opts; @@ -823,6 +826,7 @@ void smp::join_all() } __thread size_t future_avail_count = 0; +__thread size_t task_quota = 0; thread_local reactor engine; diff --git a/core/reactor.hh b/core/reactor.hh index 3c2e5013bb..86fe2ae4df 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -551,6 +551,7 @@ private: semaphore _io_context_available; circular_buffer> _pending_tasks; thread_pool _thread_pool; + size_t _task_quota; private: void abort_on_error(int ret); void complete_timers(); @@ -642,6 +643,7 @@ public: }; extern thread_local reactor engine; +extern __thread size_t task_quota; class smp { static std::vector _threads;