reactor: added a "pollers" abstraction

Each "poller" registers a non-blocking callback which is then called in
every iteration of a reactor's main loop.

Each "poller"'s callback returns a boolean: if TRUE then a main loop is allowed to block
(e.g. in epoll()).

If any of registered "pollers" returns FALSE then reactor's main loop is forbidded to block
in the current iteration.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
This commit is contained in:
Vlad Zolotarov
2014-11-27 21:52:14 +02:00
committed by Avi Kivity
parent 88a1a37a88
commit 47b3721ccf
2 changed files with 52 additions and 10 deletions

View File

@@ -447,6 +447,26 @@ int reactor::run() {
_start_promise.set_value();
});
});
// Register smp queues poller
if (smp::count > 1) {
register_new_poller(
[&] {
smp::poll_queues();
if (_pending_tasks.empty()) {
_idle.store(true, std::memory_order_seq_cst);
if (smp::poll_queues() && !_pending_tasks.empty()) {
_idle.store(false, std::memory_order_relaxed);
}
}
return idle();
}
);
}
complete_timers();
while (true) {
task_quota = _task_quota;
@@ -465,17 +485,9 @@ int reactor::run() {
break;
}
smp::poll_queues();
bool blocking_allowed = poll_once();
if (_pending_tasks.empty()) {
_idle.store(true, std::memory_order_seq_cst);
if (smp::poll_queues() && !_pending_tasks.empty()) {
_idle.store(false, std::memory_order_relaxed);
}
}
wait_and_process(_pending_tasks.empty(), [this] {
wait_and_process(_pending_tasks.empty() && blocking_allowed, [this] {
if (_pending_tasks.empty()) {
_idle.store(false, std::memory_order_relaxed);
}

View File

@@ -533,6 +533,7 @@ private:
#else
reactor_backend_epoll _backend;
#endif
std::vector<std::function<bool()>> _pollers;
static constexpr size_t max_aio = 128;
promise<> _exit_promise;
future<> _exit_future;
@@ -556,6 +557,22 @@ private:
private:
void abort_on_error(int ret);
void complete_timers();
/**
* Returns TRUE if all pollers allow blocking.
*
* @return FALSE if at least one of the blockers requires a non-blocking
* execution.
*/
bool poll_once() {
bool allow_blocking = true;
for (auto c : _pollers) {
allow_blocking = c() && allow_blocking;
}
return allow_blocking;
}
public:
static boost::program_options::options_description get_options_description();
reactor();
@@ -596,6 +613,18 @@ public:
network_stack& net() { return *_network_stack; }
unsigned cpu_id() const { return _id; }
bool idle() { return _idle.load(std::memory_order_relaxed); }
/**
* Add a new "poller" - a non-blocking function returning a boolean, that
* will be called every iteration of a main loop.
* If it returns FALSE then reactor's main loop is forbidden to block in the
* current iteration.
*
* @param fn a new "poller" function to register
*/
void register_new_poller(std::function<bool()>&& fn) {
_pollers.push_back(std::move(fn));
}
private:
future<size_t> write_all_part(pollable_fd_state& fd, const void* buffer, size_t size, size_t completed);
@@ -617,6 +646,7 @@ public:
void wait_and_process(bool block, std::function<void()> &&pre_process) {
_backend.wait_and_process(block, std::move(pre_process));
}
future<> readable(pollable_fd_state& fd) {
return _backend.readable(fd);
}