reactor: remove non polling mode

This commit is contained in:
Gleb Natapov
2014-12-28 12:39:51 +02:00
parent 7d3fb282c5
commit 889cc69a28
2 changed files with 10 additions and 92 deletions

View File

@@ -77,7 +77,6 @@ reactor_backend_epoll::reactor_backend_epoll()
reactor::reactor()
: _backend()
, _exit_future(_exit_promise.get_future())
, _idle(false)
, _cpu_started(0)
, _io_context(0)
, _io_context_available(max_aio) {
@@ -113,7 +112,6 @@ void reactor::configure(boost::program_options::variables_map vm) {
_handle_sigint = !vm.count("no-handle-interrupt");
_task_quota = vm["task-quota"].as<int>();
_poll = vm.count("poll");
}
future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd,
@@ -521,14 +519,6 @@ int reactor::run() {
smp_poller = poller(smp::poll_queues);
}
std::function<void ()> update_idle = [this] {
if (_pending_tasks.empty()) {
_idle.store(false, std::memory_order_relaxed);
}
// else, _idle already false, no need to set it again and dirty
// cache line.
};
complete_timers(_timers, _expired_timers,
[this] { return timers_completed(); },
[this] {
@@ -577,19 +567,8 @@ int reactor::run() {
break;
}
if (!poll_once() && !_poll) {
if (_pending_tasks.empty()) {
_idle.store(true, std::memory_order_seq_cst);
if (poll_once()) {
_idle.store(false, std::memory_order_relaxed);
} else {
assert(_pending_tasks.empty());
}
}
}
wait_and_process(_idle.load(std::memory_order_relaxed), update_idle);
poll_once();
wait_and_process();
}
return _return;
}
@@ -697,11 +676,9 @@ reactor::poller::~poller() {
}
void
reactor_backend_epoll::wait_and_process(bool block, std::function<void()>& pre_process) {
reactor_backend_epoll::wait_and_process() {
std::array<epoll_event, 128> eevt;
int nr = ::epoll_wait(_epollfd.get(), eevt.data(), eevt.size(),
block ? -1 : 0);
pre_process();
int nr = ::epoll_wait(_epollfd.get(), eevt.data(), eevt.size(), 0);
if (nr == -1 && errno == EINTR) {
return; // gdb can cause this
}
@@ -710,10 +687,6 @@ reactor_backend_epoll::wait_and_process(bool block, std::function<void()>& pre_p
auto& evt = eevt[i];
auto pfd = reinterpret_cast<pollable_fd_state*>(evt.data.ptr);
auto events = evt.events & (EPOLLIN | EPOLLOUT);
// FIXME: it is enough to check that pfd's task is not in _pending_tasks here
if (block) {
pfd->events_known |= events;
}
auto events_to_remove = events & ~pfd->events_requested;
complete_epoll_event(*pfd, &pollable_fd_state::pollin, events, EPOLLIN);
complete_epoll_event(*pfd, &pollable_fd_state::pollout, events, EPOLLOUT);
@@ -750,23 +723,9 @@ void syscall_work_queue::complete() {
smp_message_queue::smp_message_queue()
: _pending()
, _completed()
, _start_event(engine.make_reactor_notifier())
, _complete_event(engine.make_reactor_notifier())
{
}
void smp_message_queue::submit_kick() {
if (!_complete_peer->_poll && _pending_peer->idle()) {
_start_event->signal();
}
}
void smp_message_queue::complete_kick() {
if (!_pending_peer->_poll && _complete_peer->idle()) {
_complete_event->signal();
}
}
void smp_message_queue::move_pending() {
auto queue_room = queue_length - _current_queue_length;
auto nr = std::min(queue_room, _tx.a.pending_fifo.size());
@@ -778,7 +737,6 @@ void smp_message_queue::move_pending() {
_pending.push(begin, end);
_tx.a.pending_fifo.erase(begin, end);
_current_queue_length += nr;
submit_kick();
}
void smp_message_queue::submit_item(smp_message_queue::work_item* item) {
@@ -798,7 +756,6 @@ void smp_message_queue::respond(work_item* item) {
void smp_message_queue::flush_response_batch() {
_completed.push(_completed_fifo.begin(), _completed_fifo.end());
_completed_fifo.clear();
complete_kick();
}
size_t smp_message_queue::process_completions() {
@@ -820,13 +777,6 @@ void smp_message_queue::flush_request_batch() {
move_pending();
}
void smp_message_queue::complete() {
_complete_event->wait().then([this] {
process_completions();
complete();
});
}
size_t smp_message_queue::process_incoming() {
work_item* items[queue_length];
auto nr = _pending.pop(items);
@@ -842,14 +792,6 @@ size_t smp_message_queue::process_incoming() {
void smp_message_queue::start() {
_tx.init();
_complete_peer = &engine;
complete();
}
void smp_message_queue::listen() {
_start_event->wait().then([this] () mutable {
process_incoming();
listen();
});
}
/* not yet implemented for OSv. TODO: do the notification like we do class smp. */
@@ -964,7 +906,6 @@ reactor::get_options_description() {
format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")).c_str())
("no-handle-interrupt", "ignore SIGINT (for gdb)")
("task-quota", bpo::value<int>()->default_value(200), "Max number of tasks executed between polls and in loops")
("poll", "never sleep, poll for the next event")
;
opts.add(network_stack_registry::options_description());
return opts;
@@ -994,7 +935,6 @@ void smp::listen_all(smp_message_queue* qs)
{
for (unsigned i = 0; i < smp::count; i++) {
qs[i]._pending_peer = &engine;
qs[i].listen();
}
}
@@ -1194,11 +1134,7 @@ reactor_backend_osv::reactor_backend_osv() {
}
void
reactor_backend_osv::wait_and_process(bool block, std::function<void()>& pre_process) {
if (block) {
_poller.wait();
}
pre_process();
reactor_backend_osv::wait_and_process() {
_poller.process();
// osv::poller::process runs pollable's callbacks, but does not currently
// have a timer expiration callback - instead if gives us an expired()

View File

@@ -385,8 +385,6 @@ class smp_message_queue {
boost::lockfree::capacity<queue_length>>;
lf_queue _pending;
lf_queue _completed;
std::unique_ptr<reactor_notifier> _start_event;
std::unique_ptr<reactor_notifier> _complete_event;
size_t _current_queue_length = 0;
reactor* _pending_peer;
reactor* _complete_peer;
@@ -448,16 +446,12 @@ public:
return fut;
}
void start();
void listen();
size_t process_incoming();
size_t process_completions();
private:
void work();
void complete();
void submit_item(work_item* wi);
void respond(work_item* wi);
void submit_kick();
void complete_kick();
void move_pending();
void flush_request_batch();
void flush_response_batch();
@@ -499,8 +493,7 @@ public:
// and just processes events that have already happened, if any.
// After the optional wait, just before processing the events, the
// pre_process() function is called.
virtual void wait_and_process(bool block,
std::function<void()>& pre_process) = 0;
virtual void wait_and_process() = 0;
// Methods that allow polling on file descriptors. This will only work on
// reactor_backend_epoll. Other reactor_backend will probably abort if
// they are called (which is fine if no file descriptors are waited on):
@@ -529,7 +522,7 @@ private:
public:
reactor_backend_epoll();
virtual ~reactor_backend_epoll() override { }
virtual void wait_and_process(bool block, std::function<void()>& pre_process) override;
virtual void wait_and_process() override;
virtual future<> readable(pollable_fd_state& fd) override;
virtual future<> writeable(pollable_fd_state& fd) override;
virtual void forget(pollable_fd_state& fd) override;
@@ -551,8 +544,7 @@ private:
public:
reactor_backend_osv();
virtual ~reactor_backend_osv() override { }
virtual void wait_and_process(bool block,
std::function<void()> &&pre_process) override;
virtual void wait_and_process() override;
virtual future<> readable(pollable_fd_state& fd) override;
virtual future<> writeable(pollable_fd_state& fd) override;
virtual void forget(pollable_fd_state& fd) override;
@@ -580,11 +572,9 @@ private:
static constexpr size_t max_aio = 128;
promise<> _exit_promise;
future<> _exit_future;
std::atomic<bool> _idle;
unsigned _id = 0;
bool _stopped = false;
bool _handle_sigint = true;
bool _poll = false;
promise<std::unique_ptr<network_stack>> _network_stack_ready_promise;
int _return = 0;
timer_t _timer;
@@ -681,14 +671,6 @@ public:
network_stack& net() { return *_network_stack; }
unsigned cpu_id() const { return _id; }
bool idle() {
if (_poll) {
return false;
} else {
std::atomic_thread_fence(std::memory_order_seq_cst);
return _idle.load(std::memory_order_relaxed);
}
}
private:
/**
@@ -726,8 +708,8 @@ private:
friend class smp_message_queue;
friend class poller;
public:
void wait_and_process(bool block, std::function<void()>& pre_process) {
_backend.wait_and_process(block, pre_process);
void wait_and_process() {
_backend.wait_and_process();
}
future<> readable(pollable_fd_state& fd) {