diff --git a/core/reactor.cc b/core/reactor.cc index 8ce4dff5fc..f449e08f6a 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -118,14 +118,69 @@ reactor_backend_epoll::reactor_backend_epoll() : _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) { } +reactor::signals::signals() : _pending_signals(0) { +} + +reactor::signals::~signals() { + sigset_t mask; + sigfillset(&mask); + ::sigprocmask(SIG_BLOCK, &mask, NULL); +} + +reactor::signals::signal_handler::signal_handler(int signo, std::function&& handler) + : _handler(std::move(handler)) { + auto mask = make_sigset_mask(signo); + auto r = ::sigprocmask(SIG_UNBLOCK, &mask, NULL); + throw_system_error_on(r == -1); + struct sigaction sa; + sa.sa_sigaction = action; + sa.sa_mask = make_empty_sigset_mask(); + sa.sa_flags = SA_SIGINFO | SA_RESTART; + r = ::sigaction(signo, &sa, nullptr); + throw_system_error_on(r == -1); +} + +void +reactor::signals::handle_signal(int signo, std::function&& handler) { + _signal_handlers.emplace(std::piecewise_construct, + std::make_tuple(signo), std::make_tuple(signo, std::move(handler))); +} + +void +reactor::signals::handle_signal_once(int signo, std::function&& handler) { + return handle_signal(signo, [fired = false, handler = std::move(handler)] () mutable { + if (!fired) { + fired = true; + handler(); + } + }); +} + +bool reactor::signals::poll_signal() { + auto signals = _pending_signals.load(std::memory_order_relaxed); + if (signals) { + _pending_signals.fetch_and(~signals, std::memory_order_relaxed); + for (size_t i = 0; i < sizeof(signals)*8; i++) { + if (signals & (1ull << i)) { + _signal_handlers.at(i)._handler(); + } + } + } + return signals; +} + +void reactor::signals::action(int signo, siginfo_t* siginfo, void* ignore) { + engine()._signals._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); +} + reactor::reactor() : _backend() , _exit_future(_exit_promise.get_future()) , _cpu_started(0) , _io_context(0) , _io_context_available(max_aio) - , _reuseport(posix_reuseport_detect()) - , _pending_signals(0) { + , _reuseport(posix_reuseport_detect()) { + auto r = ::io_setup(max_aio, &_io_context); assert(r >= 0); struct sigevent sev; @@ -575,51 +630,6 @@ void reactor::exit(int ret) { smp::submit_to(0, [this, ret] { _return = ret; stop(); }); } -void -reactor::handle_signal(int signo, std::function&& handler) { - _signal_handlers.emplace(std::piecewise_construct, - std::make_tuple(signo), std::make_tuple(signo, std::move(handler))); -} - -void -reactor::handle_signal_once(int signo, std::function&& handler) { - return handle_signal(signo, [fired = false, handler = std::move(handler)] () mutable { - if (!fired) { - fired = true; - handler(); - } - }); -} - -void sigaction(int signo, siginfo_t* siginfo, void* ignore) { - engine()._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); -} - -bool reactor::poll_signal() { - auto signals = _pending_signals.load(std::memory_order_relaxed); - if (signals) { - _pending_signals.fetch_and(~signals, std::memory_order_relaxed); - for (size_t i = 0; i < sizeof(signals)*8; i++) { - if (signals & (1ull << i)) { - _signal_handlers.at(i)._handler(); - } - } - } - return signals; -} - -reactor::signal_handler::signal_handler(int signo, std::function&& handler) - : _handler(std::move(handler)) { - auto mask = make_sigset_mask(signo); - auto r = ::sigprocmask(SIG_UNBLOCK, &mask, NULL); - throw_system_error_on(r == -1); - struct sigaction sa; - sa.sa_sigaction = sigaction; - sa.sa_mask = make_empty_sigset_mask(); - sa.sa_flags = SA_SIGINFO | SA_RESTART; - r = ::sigaction(signo, &sa, nullptr); - throw_system_error_on(r == -1); -} struct reactor::collectd_registrations { std::vector regs; @@ -707,13 +717,13 @@ int reactor::run() { poller io_poller([&] { return process_io(); }); #endif - poller sig_poller([&] { return poll_signal(); } ); + poller sig_poller([&] { return _signals.poll_signal(); } ); if (_id == 0) { if (_handle_sigint) { - handle_signal_once(SIGINT, [this] { stop(); }); + _signals.handle_signal_once(SIGINT, [this] { stop(); }); } - handle_signal_once(SIGTERM, [this] { stop(); }); + _signals.handle_signal_once(SIGTERM, [this] { stop(); }); } _cpu_started.wait(smp::count).then([this] { @@ -736,7 +746,7 @@ int reactor::run() { smp_poller = poller(smp::poll_queues); } - handle_signal(SIGALRM, [this] { + _signals.handle_signal(SIGALRM, [this] { complete_timers(_timers, _expired_timers, [this] { if (!_timers.empty()) { enable_timer(_timers.get_next_timeout()); @@ -1106,7 +1116,7 @@ void smp_message_queue::start(unsigned cpuid) { /* not yet implemented for OSv. TODO: do the notification like we do class smp. */ #ifndef HAVE_OSV thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) { - engine().handle_signal(SIGUSR1, [this] { inter_thread_wq.complete(); }); + engine()._signals.handle_signal(SIGUSR1, [this] { inter_thread_wq.complete(); }); } void thread_pool::work() { diff --git a/core/reactor.hh b/core/reactor.hh index ee80dd2ff6..e041333471 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -688,16 +688,28 @@ private: template // signature: bool () static std::unique_ptr make_pollfn(Func&& func); - struct signal_handler { - signal_handler(int signo, std::function&& handler); - std::function _handler; - }; - std::atomic _pending_signals; - std::unordered_map _signal_handlers; - bool poll_signal(); - friend void sigaction(int signo, siginfo_t* siginfo, void* ignore); + class signals { + public: + signals(); + ~signals(); + bool poll_signal(); + void handle_signal(int signo, std::function&& handler); + void handle_signal_once(int signo, std::function&& handler); + static void action(int signo, siginfo_t* siginfo, void* ignore); + + private: + struct signal_handler { + signal_handler(int signo, std::function&& handler); + std::function _handler; + }; + std::atomic _pending_signals; + std::unordered_map _signal_handlers; + }; + + signals _signals; thread_pool _thread_pool; + friend thread_pool; void run_tasks(circular_buffer>& tasks, size_t task_quota); bool posix_reuseport_detect(); @@ -744,9 +756,6 @@ public: template future submit_io(Func prepare_io); - void handle_signal(int signo, std::function&& handler); - void handle_signal_once(int signo, std::function&& handler); - int run(); void exit(int ret); future<> when_started() { return _start_promise.get_future(); }