diff --git a/core/reactor.cc b/core/reactor.cc index 397d814ab6..2f17fa7ef5 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -447,6 +447,26 @@ int reactor::run() { _start_promise.set_value(); }); }); + + // Register smp queues poller + if (smp::count > 1) { + register_new_poller( + [&] { + smp::poll_queues(); + + if (_pending_tasks.empty()) { + _idle.store(true, std::memory_order_seq_cst); + + if (smp::poll_queues() && !_pending_tasks.empty()) { + _idle.store(false, std::memory_order_relaxed); + } + } + + return idle(); + } + ); + } + complete_timers(); while (true) { task_quota = _task_quota; @@ -465,17 +485,9 @@ int reactor::run() { break; } - smp::poll_queues(); + bool blocking_allowed = poll_once(); - if (_pending_tasks.empty()) { - _idle.store(true, std::memory_order_seq_cst); - - if (smp::poll_queues() && !_pending_tasks.empty()) { - _idle.store(false, std::memory_order_relaxed); - } - } - - wait_and_process(_pending_tasks.empty(), [this] { + wait_and_process(_pending_tasks.empty() && blocking_allowed, [this] { if (_pending_tasks.empty()) { _idle.store(false, std::memory_order_relaxed); } diff --git a/core/reactor.hh b/core/reactor.hh index 9a8972f3b1..49596bf58a 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -533,6 +533,7 @@ private: #else reactor_backend_epoll _backend; #endif + std::vector> _pollers; static constexpr size_t max_aio = 128; promise<> _exit_promise; future<> _exit_future; @@ -556,6 +557,22 @@ private: private: void abort_on_error(int ret); void complete_timers(); + + /** + * Returns TRUE if all pollers allow blocking. + * + * @return FALSE if at least one of the blockers requires a non-blocking + * execution. + */ + bool poll_once() { + bool allow_blocking = true; + for (auto c : _pollers) { + allow_blocking = c() && allow_blocking; + } + + return allow_blocking; + } + public: static boost::program_options::options_description get_options_description(); reactor(); @@ -596,6 +613,18 @@ public: network_stack& net() { return *_network_stack; } unsigned cpu_id() const { return _id; } bool idle() { return _idle.load(std::memory_order_relaxed); } + + /** + * Add a new "poller" - a non-blocking function returning a boolean, that + * will be called every iteration of a main loop. + * If it returns FALSE then reactor's main loop is forbidden to block in the + * current iteration. + * + * @param fn a new "poller" function to register + */ + void register_new_poller(std::function&& fn) { + _pollers.push_back(std::move(fn)); + } private: future write_all_part(pollable_fd_state& fd, const void* buffer, size_t size, size_t completed); @@ -617,6 +646,7 @@ public: void wait_and_process(bool block, std::function &&pre_process) { _backend.wait_and_process(block, std::move(pre_process)); } + future<> readable(pollable_fd_state& fd) { return _backend.readable(fd); }