diff --git a/core/reactor.cc b/core/reactor.cc index dff8eb758d..a7b7eda265 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -111,12 +111,6 @@ reactor::reactor() sev.sigev_signo = SIGALRM; r = timer_create(CLOCK_REALTIME, &sev, &_timer); assert(r >= 0); - keep_doing([this] { - return receive_signal(SIGALRM).then([this] { - _timer_promise.set_value(); - _timer_promise = promise<>(); - }); - }).or_terminate(); memory::set_reclaim_hook([this] (std::function reclaim_fn) { // push it in the front of the queue so we reclaim memory quickly _pending_tasks.push_front(make_task([fn = std::move(reclaim_fn)] { @@ -546,14 +540,10 @@ void reactor::exit(int ret) { smp::submit_to(0, [this, ret] { _return = ret; stop(); }); } -future<> -reactor::receive_signal(int signo) { - auto i = _signal_handlers.find(signo); - if (i == _signal_handlers.end()) { - i = _signal_handlers.emplace(signo, signo).first; - } - signal_handler& sh = i->second; - return sh._promise.get_future(); +void +reactor::handle_signal(int signo, std::function&& handler) { + _signal_handlers.emplace(std::piecewise_construct, + std::make_tuple(signo), std::make_tuple(signo, std::move(handler))); } void sigaction(int signo, siginfo_t* siginfo, void* ignore) { @@ -566,15 +556,15 @@ bool reactor::poll_signal() { _pending_signals.fetch_and(~signals, std::memory_order_relaxed); for (size_t i = 0; i < sizeof(signals)*8; i++) { if (signals & (1ull << i)) { - _signal_handlers.at(i)._promise.set_value(); - _signal_handlers.at(i)._promise = promise<>(); + _signal_handlers.at(i)._handler(); } } } return signals; } -reactor::signal_handler::signal_handler(int signo) { +reactor::signal_handler::signal_handler(int signo, std::function&& handler) + : _handler(std::move(handler)) { auto mask = make_sigset_mask(signo); auto r = ::sigprocmask(SIG_UNBLOCK, &mask, NULL); throw_system_error_on(r == -1); @@ -676,9 +666,9 @@ int reactor::run() { if (_id == 0) { if (_handle_sigint) { - receive_signal(SIGINT).then([this] { stop(); }); + handle_signal(SIGINT, [this] { stop(); }); } - receive_signal(SIGTERM).then([this] { stop(); }); + handle_signal(SIGTERM, [this] { stop(); }); } _cpu_started.wait(smp::count).then([this] { @@ -701,24 +691,13 @@ int reactor::run() { smp_poller = poller(smp::poll_queues); } - complete_timers(_timers, _expired_timers, - [this] { return timers_completed(); }, - [this] { + handle_signal(SIGALRM, [this] { + complete_timers(_timers, _expired_timers, [this] { if (!_timers.empty()) { enable_timer(_timers.get_next_timeout()); } - } - ); - complete_timers(_lowres_timers, _expired_lowres_timers, - [this] { return lowres_timers_completed(); }, - [this] { - if (!_lowres_timers.empty()) { - _lowres_next_timeout = _lowres_timers.get_next_timeout(); - } else { - _lowres_next_timeout = lowres_clock::time_point(); - } - } - ); + }); + }); poller drain_cross_cpu_freelist([] { return memory::drain_cross_cpu_freelist(); @@ -730,8 +709,13 @@ int reactor::run() { } auto now = lowres_clock::now(); if (now > _lowres_next_timeout) { - _lowres_timer_promise.set_value(); - _lowres_timer_promise = promise<>(); + complete_timers(_lowres_timers, _expired_lowres_timers, [this] { + if (!_lowres_timers.empty()) { + _lowres_next_timeout = _lowres_timers.get_next_timeout(); + } else { + _lowres_next_timeout = lowres_clock::time_point(); + } + }); return true; } return false; @@ -1074,9 +1058,7 @@ void smp_message_queue::start(unsigned cpuid) { /* not yet implemented for OSv. TODO: do the notification like we do class smp. */ #ifndef HAVE_OSV thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) { - keep_doing([this] { - return engine().receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); }); - }); + engine().handle_signal(SIGUSR1, [this] { inter_thread_wq.complete(); }); } void thread_pool::work() { diff --git a/core/reactor.hh b/core/reactor.hh index ee427f612d..9ba266afa4 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -630,16 +630,14 @@ private: // _lowres_clock will only be created on cpu 0 std::unique_ptr _lowres_clock; lowres_clock::time_point _lowres_next_timeout; - promise<> _lowres_timer_promise; - promise<> _timer_promise; std::experimental::optional _epoll_poller; const bool _reuseport; circular_buffer _loads; double _load = 0; private: void abort_on_error(int ret); - template - void complete_timers(T&, E&, std::function ()>, std::function); + template + void complete_timers(T&, E&, EnableFunc&& enable_fn); /** * Returns TRUE if all pollers allow blocking. @@ -652,8 +650,8 @@ private: static std::unique_ptr make_pollfn(Func&& func); struct signal_handler { - signal_handler(int signo); - promise<> _promise; + signal_handler(int signo, std::function&& handler); + std::function _handler; }; std::atomic _pending_signals; std::unordered_map _signal_handlers; @@ -707,7 +705,7 @@ public: template future submit_io(Func prepare_io); - future<> receive_signal(int signo); + void handle_signal(int signo, std::function&& handler); int run(); void exit(int ret); @@ -788,12 +786,6 @@ public: return _backend.notified(n); } void enable_timer(clock_type::time_point when); - future<> timers_completed() { - return _timer_promise.get_future(); - } - future<> lowres_timers_completed() { - return _lowres_timer_promise.get_future(); - } std::unique_ptr make_reactor_notifier() { return _backend.make_reactor_notifier(); } @@ -1134,31 +1126,25 @@ reactor::write_all(pollable_fd_state& fd, const void* buffer, size_t len) { return write_all_part(fd, buffer, len, 0); } -template -void reactor::complete_timers(T& timers, E& expired_timers, - std::function ()> completed_fn, - std::function enable_fn) { - completed_fn().then([this, &timers, &expired_timers, completed_fn, - enable_fn = std::move(enable_fn)] () mutable { - expired_timers = timers.expire(timers.now()); - for (auto& t : expired_timers) { - t._expired = true; - } - while (!expired_timers.empty()) { - auto t = &*expired_timers.begin(); - expired_timers.pop_front(); - t->_queued = false; - if (t->_armed) { - t->_armed = false; - if (t->_period) { - t->arm_periodic(*t->_period); - } - t->_callback(); +template +void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) { + expired_timers = timers.expire(timers.now()); + for (auto& t : expired_timers) { + t._expired = true; + } + while (!expired_timers.empty()) { + auto t = &*expired_timers.begin(); + expired_timers.pop_front(); + t->_queued = false; + if (t->_armed) { + t->_armed = false; + if (t->_period) { + t->arm_periodic(*t->_period); } + t->_callback(); } - enable_fn(); - complete_timers(timers, expired_timers, std::move(completed_fn), std::move(enable_fn)); - }); + } + enable_fn(); } template