From 770214ea63378c805487fe91ddd65f22da141194 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 28 Jan 2015 11:07:14 +0200 Subject: [PATCH 1/2] reactor: simplify signal handling Signals can be repetitive, and therefore are not a good match for promise/future, which are single shot. Replace with plain callbacks. --- core/reactor.cc | 36 ++++++++++++++---------------------- core/reactor.hh | 6 +++--- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 8014d227c4..8d167296e1 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -112,12 +112,10 @@ reactor::reactor() sev.sigev_signo = SIGALRM; r = timer_create(CLOCK_REALTIME, &sev, &_timer); assert(r >= 0); - keep_doing([this] { - return receive_signal(SIGALRM).then([this] { - _timer_promise.set_value(); - _timer_promise = promise<>(); - }); - }).or_terminate(); + handle_signal(SIGALRM, [this] { + _timer_promise.set_value(); + _timer_promise = promise<>(); + }); memory::set_reclaim_hook([this] (std::function reclaim_fn) { // push it in the front of the queue so we reclaim memory quickly _pending_tasks.push_front(make_task([fn = std::move(reclaim_fn)] { @@ -547,14 +545,10 @@ void reactor::exit(int ret) { smp::submit_to(0, [this, ret] { _return = ret; stop(); }); } -future<> -reactor::receive_signal(int signo) { - auto i = _signal_handlers.find(signo); - if (i == _signal_handlers.end()) { - i = _signal_handlers.emplace(signo, signo).first; - } - signal_handler& sh = i->second; - return sh._promise.get_future(); +void +reactor::handle_signal(int signo, std::function&& handler) { + _signal_handlers.emplace(std::piecewise_construct, + std::make_tuple(signo), std::make_tuple(signo, std::move(handler))); } void sigaction(int signo, siginfo_t* siginfo, void* ignore) { @@ -567,15 +561,15 @@ bool reactor::poll_signal() { _pending_signals.fetch_and(~signals, std::memory_order_relaxed); for (size_t i = 0; i < sizeof(signals)*8; i++) { if (signals & (1ull << i)) { - _signal_handlers.at(i)._promise.set_value(); - _signal_handlers.at(i)._promise = promise<>(); + _signal_handlers.at(i)._handler(); } } } return signals; } -reactor::signal_handler::signal_handler(int signo) { +reactor::signal_handler::signal_handler(int signo, std::function&& handler) + : _handler(std::move(handler)) { auto mask = make_sigset_mask(signo); auto r = ::sigprocmask(SIG_UNBLOCK, &mask, NULL); throw_system_error_on(r == -1); @@ -677,9 +671,9 @@ int reactor::run() { if (_id == 0) { if (_handle_sigint) { - receive_signal(SIGINT).then([this] { stop(); }); + handle_signal(SIGINT, [this] { stop(); }); } - receive_signal(SIGTERM).then([this] { stop(); }); + handle_signal(SIGTERM, [this] { stop(); }); } _cpu_started.wait(smp::count).then([this] { @@ -1075,9 +1069,7 @@ void smp_message_queue::start(unsigned cpuid) { /* not yet implemented for OSv. TODO: do the notification like we do class smp. */ #ifndef HAVE_OSV thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) { - keep_doing([this] { - return engine().receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); }); - }); + engine().handle_signal(SIGUSR1, [this] { inter_thread_wq.complete(); }); } void thread_pool::work() { diff --git a/core/reactor.hh b/core/reactor.hh index 855add3a58..89f359ed3e 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -652,8 +652,8 @@ private: static std::unique_ptr make_pollfn(Func&& func); struct signal_handler { - signal_handler(int signo); - promise<> _promise; + signal_handler(int signo, std::function&& handler); + std::function _handler; }; std::atomic _pending_signals; std::unordered_map _signal_handlers; @@ -707,7 +707,7 @@ public: template future submit_io(Func prepare_io); - future<> receive_signal(int signo); + void handle_signal(int signo, std::function&& handler); int run(); void exit(int ret); From 38839293f92bb2b9922cc24085ca10a384a8abbd Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 28 Jan 2015 11:29:16 +0200 Subject: [PATCH 2/2] reactor: simplify timer handling Instead of scheduling timer processing to happen in the future, process timers in the context of the poller (signal poller for high resolution timer, lowres time poller for low resolution timers). This both reduces timer latency (not very important) but removes a use of promise/future in a repetitive task, which is error prone. --- core/reactor.cc | 32 +++++++++++------------------- core/reactor.hh | 52 ++++++++++++++++++------------------------------- 2 files changed, 30 insertions(+), 54 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 8d167296e1..4c61f6053d 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -112,10 +112,6 @@ reactor::reactor() sev.sigev_signo = SIGALRM; r = timer_create(CLOCK_REALTIME, &sev, &_timer); assert(r >= 0); - handle_signal(SIGALRM, [this] { - _timer_promise.set_value(); - _timer_promise = promise<>(); - }); memory::set_reclaim_hook([this] (std::function reclaim_fn) { // push it in the front of the queue so we reclaim memory quickly _pending_tasks.push_front(make_task([fn = std::move(reclaim_fn)] { @@ -696,24 +692,13 @@ int reactor::run() { smp_poller = poller(smp::poll_queues); } - complete_timers(_timers, _expired_timers, - [this] { return timers_completed(); }, - [this] { + handle_signal(SIGALRM, [this] { + complete_timers(_timers, _expired_timers, [this] { if (!_timers.empty()) { enable_timer(_timers.get_next_timeout()); } - } - ); - complete_timers(_lowres_timers, _expired_lowres_timers, - [this] { return lowres_timers_completed(); }, - [this] { - if (!_lowres_timers.empty()) { - _lowres_next_timeout = _lowres_timers.get_next_timeout(); - } else { - _lowres_next_timeout = lowres_clock::time_point(); - } - } - ); + }); + }); poller drain_cross_cpu_freelist([] { return memory::drain_cross_cpu_freelist(); @@ -725,8 +710,13 @@ int reactor::run() { } auto now = lowres_clock::now(); if (now > _lowres_next_timeout) { - _lowres_timer_promise.set_value(); - _lowres_timer_promise = promise<>(); + complete_timers(_lowres_timers, _expired_lowres_timers, [this] { + if (!_lowres_timers.empty()) { + _lowres_next_timeout = _lowres_timers.get_next_timeout(); + } else { + _lowres_next_timeout = lowres_clock::time_point(); + } + }); return true; } return false; diff --git a/core/reactor.hh b/core/reactor.hh index 89f359ed3e..283af88c0a 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -630,16 +630,14 @@ private: // _lowres_clock will only be created on cpu 0 std::unique_ptr _lowres_clock; lowres_clock::time_point _lowres_next_timeout; - promise<> _lowres_timer_promise; - promise<> _timer_promise; std::experimental::optional _epoll_poller; const bool _reuseport; circular_buffer _loads; double _load = 0; private: void abort_on_error(int ret); - template - void complete_timers(T&, E&, std::function ()>, std::function); + template + void complete_timers(T&, E&, EnableFunc&& enable_fn); /** * Returns TRUE if all pollers allow blocking. @@ -788,12 +786,6 @@ public: return _backend.notified(n); } void enable_timer(clock_type::time_point when); - future<> timers_completed() { - return _timer_promise.get_future(); - } - future<> lowres_timers_completed() { - return _lowres_timer_promise.get_future(); - } std::unique_ptr make_reactor_notifier() { return _backend.make_reactor_notifier(); } @@ -1133,31 +1125,25 @@ reactor::write_all(pollable_fd_state& fd, const void* buffer, size_t len) { return write_all_part(fd, buffer, len, 0); } -template -void reactor::complete_timers(T& timers, E& expired_timers, - std::function ()> completed_fn, - std::function enable_fn) { - completed_fn().then([this, &timers, &expired_timers, completed_fn, - enable_fn = std::move(enable_fn)] () mutable { - expired_timers = timers.expire(timers.now()); - for (auto& t : expired_timers) { - t._expired = true; - } - while (!expired_timers.empty()) { - auto t = &*expired_timers.begin(); - expired_timers.pop_front(); - t->_queued = false; - if (t->_armed) { - t->_armed = false; - if (t->_period) { - t->arm_periodic(*t->_period); - } - t->_callback(); +template +void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) { + expired_timers = timers.expire(timers.now()); + for (auto& t : expired_timers) { + t._expired = true; + } + while (!expired_timers.empty()) { + auto t = &*expired_timers.begin(); + expired_timers.pop_front(); + t->_queued = false; + if (t->_armed) { + t->_armed = false; + if (t->_period) { + t->arm_periodic(*t->_period); } + t->_callback(); } - enable_fn(); - complete_timers(timers, expired_timers, std::move(completed_fn), std::move(enable_fn)); - }); + } + enable_fn(); } template