diff --git a/core/reactor.cc b/core/reactor.cc index 2c13e330dd..2ca247552c 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -77,7 +77,6 @@ reactor_backend_epoll::reactor_backend_epoll() reactor::reactor() : _backend() , _exit_future(_exit_promise.get_future()) - , _idle(false) , _cpu_started(0) , _io_context(0) , _io_context_available(max_aio) { @@ -113,7 +112,6 @@ void reactor::configure(boost::program_options::variables_map vm) { _handle_sigint = !vm.count("no-handle-interrupt"); _task_quota = vm["task-quota"].as(); - _poll = vm.count("poll"); } future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd, @@ -521,14 +519,6 @@ int reactor::run() { smp_poller = poller(smp::poll_queues); } - std::function update_idle = [this] { - if (_pending_tasks.empty()) { - _idle.store(false, std::memory_order_relaxed); - } - // else, _idle already false, no need to set it again and dirty - // cache line. - }; - complete_timers(_timers, _expired_timers, [this] { return timers_completed(); }, [this] { @@ -577,19 +567,8 @@ int reactor::run() { break; } - if (!poll_once() && !_poll) { - if (_pending_tasks.empty()) { - _idle.store(true, std::memory_order_seq_cst); - - if (poll_once()) { - _idle.store(false, std::memory_order_relaxed); - } else { - assert(_pending_tasks.empty()); - } - } - } - - wait_and_process(_idle.load(std::memory_order_relaxed), update_idle); + poll_once(); + wait_and_process(); } return _return; } @@ -697,11 +676,9 @@ reactor::poller::~poller() { } void -reactor_backend_epoll::wait_and_process(bool block, std::function& pre_process) { +reactor_backend_epoll::wait_and_process() { std::array eevt; - int nr = ::epoll_wait(_epollfd.get(), eevt.data(), eevt.size(), - block ? -1 : 0); - pre_process(); + int nr = ::epoll_wait(_epollfd.get(), eevt.data(), eevt.size(), 0); if (nr == -1 && errno == EINTR) { return; // gdb can cause this } @@ -710,10 +687,6 @@ reactor_backend_epoll::wait_and_process(bool block, std::function& pre_p auto& evt = eevt[i]; auto pfd = reinterpret_cast(evt.data.ptr); auto events = evt.events & (EPOLLIN | EPOLLOUT); - // FIXME: it is enough to check that pfd's task is not in _pending_tasks here - if (block) { - pfd->events_known |= events; - } auto events_to_remove = events & ~pfd->events_requested; complete_epoll_event(*pfd, &pollable_fd_state::pollin, events, EPOLLIN); complete_epoll_event(*pfd, &pollable_fd_state::pollout, events, EPOLLOUT); @@ -750,23 +723,9 @@ void syscall_work_queue::complete() { smp_message_queue::smp_message_queue() : _pending() , _completed() - , _start_event(engine.make_reactor_notifier()) - , _complete_event(engine.make_reactor_notifier()) { } -void smp_message_queue::submit_kick() { - if (!_complete_peer->_poll && _pending_peer->idle()) { - _start_event->signal(); - } -} - -void smp_message_queue::complete_kick() { - if (!_pending_peer->_poll && _complete_peer->idle()) { - _complete_event->signal(); - } -} - void smp_message_queue::move_pending() { auto queue_room = queue_length - _current_queue_length; auto nr = std::min(queue_room, _tx.a.pending_fifo.size()); @@ -778,7 +737,6 @@ void smp_message_queue::move_pending() { _pending.push(begin, end); _tx.a.pending_fifo.erase(begin, end); _current_queue_length += nr; - submit_kick(); } void smp_message_queue::submit_item(smp_message_queue::work_item* item) { @@ -798,7 +756,6 @@ void smp_message_queue::respond(work_item* item) { void smp_message_queue::flush_response_batch() { _completed.push(_completed_fifo.begin(), _completed_fifo.end()); _completed_fifo.clear(); - complete_kick(); } size_t smp_message_queue::process_completions() { @@ -820,13 +777,6 @@ void smp_message_queue::flush_request_batch() { move_pending(); } -void smp_message_queue::complete() { - _complete_event->wait().then([this] { - process_completions(); - complete(); - }); -} - size_t smp_message_queue::process_incoming() { work_item* items[queue_length]; auto nr = _pending.pop(items); @@ -842,14 +792,6 @@ size_t smp_message_queue::process_incoming() { void smp_message_queue::start() { _tx.init(); _complete_peer = &engine; - complete(); -} - -void smp_message_queue::listen() { - _start_event->wait().then([this] () mutable { - process_incoming(); - listen(); - }); } /* not yet implemented for OSv. TODO: do the notification like we do class smp. */ @@ -964,7 +906,6 @@ reactor::get_options_description() { format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")).c_str()) ("no-handle-interrupt", "ignore SIGINT (for gdb)") ("task-quota", bpo::value()->default_value(200), "Max number of tasks executed between polls and in loops") - ("poll", "never sleep, poll for the next event") ; opts.add(network_stack_registry::options_description()); return opts; @@ -994,7 +935,6 @@ void smp::listen_all(smp_message_queue* qs) { for (unsigned i = 0; i < smp::count; i++) { qs[i]._pending_peer = &engine; - qs[i].listen(); } } @@ -1194,11 +1134,7 @@ reactor_backend_osv::reactor_backend_osv() { } void -reactor_backend_osv::wait_and_process(bool block, std::function& pre_process) { - if (block) { - _poller.wait(); - } - pre_process(); +reactor_backend_osv::wait_and_process() { _poller.process(); // osv::poller::process runs pollable's callbacks, but does not currently // have a timer expiration callback - instead if gives us an expired() diff --git a/core/reactor.hh b/core/reactor.hh index 50a1ec7dec..872f4ea07f 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -385,8 +385,6 @@ class smp_message_queue { boost::lockfree::capacity>; lf_queue _pending; lf_queue _completed; - std::unique_ptr _start_event; - std::unique_ptr _complete_event; size_t _current_queue_length = 0; reactor* _pending_peer; reactor* _complete_peer; @@ -448,16 +446,12 @@ public: return fut; } void start(); - void listen(); size_t process_incoming(); size_t process_completions(); private: void work(); - void complete(); void submit_item(work_item* wi); void respond(work_item* wi); - void submit_kick(); - void complete_kick(); void move_pending(); void flush_request_batch(); void flush_response_batch(); @@ -499,8 +493,7 @@ public: // and just processes events that have already happened, if any. // After the optional wait, just before processing the events, the // pre_process() function is called. - virtual void wait_and_process(bool block, - std::function& pre_process) = 0; + virtual void wait_and_process() = 0; // Methods that allow polling on file descriptors. This will only work on // reactor_backend_epoll. Other reactor_backend will probably abort if // they are called (which is fine if no file descriptors are waited on): @@ -529,7 +522,7 @@ private: public: reactor_backend_epoll(); virtual ~reactor_backend_epoll() override { } - virtual void wait_and_process(bool block, std::function& pre_process) override; + virtual void wait_and_process() override; virtual future<> readable(pollable_fd_state& fd) override; virtual future<> writeable(pollable_fd_state& fd) override; virtual void forget(pollable_fd_state& fd) override; @@ -551,8 +544,7 @@ private: public: reactor_backend_osv(); virtual ~reactor_backend_osv() override { } - virtual void wait_and_process(bool block, - std::function &&pre_process) override; + virtual void wait_and_process() override; virtual future<> readable(pollable_fd_state& fd) override; virtual future<> writeable(pollable_fd_state& fd) override; virtual void forget(pollable_fd_state& fd) override; @@ -580,11 +572,9 @@ private: static constexpr size_t max_aio = 128; promise<> _exit_promise; future<> _exit_future; - std::atomic _idle; unsigned _id = 0; bool _stopped = false; bool _handle_sigint = true; - bool _poll = false; promise> _network_stack_ready_promise; int _return = 0; timer_t _timer; @@ -681,14 +671,6 @@ public: network_stack& net() { return *_network_stack; } unsigned cpu_id() const { return _id; } - bool idle() { - if (_poll) { - return false; - } else { - std::atomic_thread_fence(std::memory_order_seq_cst); - return _idle.load(std::memory_order_relaxed); - } - } private: /** @@ -726,8 +708,8 @@ private: friend class smp_message_queue; friend class poller; public: - void wait_and_process(bool block, std::function& pre_process) { - _backend.wait_and_process(block, pre_process); + void wait_and_process() { + _backend.wait_and_process(); } future<> readable(pollable_fd_state& fd) {