Block all signals when reactor is deleted
1. Moved all signal handler functions/members into a wrapping class 2. Block all signals on desctuctor call (resetting of signal handler is not enough since some signals will arive even if we reset signal handler to SIG_DFL e.g. timer) Fixes use-after-free when a signal is caught after the reactor is destroyed. Signed-off-by: Shlomi Livne <shlomi@cloudius-systems.com>
This commit is contained in:
114
core/reactor.cc
114
core/reactor.cc
@@ -118,14 +118,69 @@ reactor_backend_epoll::reactor_backend_epoll()
|
||||
: _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) {
|
||||
}
|
||||
|
||||
reactor::signals::signals() : _pending_signals(0) {
|
||||
}
|
||||
|
||||
reactor::signals::~signals() {
|
||||
sigset_t mask;
|
||||
sigfillset(&mask);
|
||||
::sigprocmask(SIG_BLOCK, &mask, NULL);
|
||||
}
|
||||
|
||||
reactor::signals::signal_handler::signal_handler(int signo, std::function<void ()>&& handler)
|
||||
: _handler(std::move(handler)) {
|
||||
auto mask = make_sigset_mask(signo);
|
||||
auto r = ::sigprocmask(SIG_UNBLOCK, &mask, NULL);
|
||||
throw_system_error_on(r == -1);
|
||||
struct sigaction sa;
|
||||
sa.sa_sigaction = action;
|
||||
sa.sa_mask = make_empty_sigset_mask();
|
||||
sa.sa_flags = SA_SIGINFO | SA_RESTART;
|
||||
r = ::sigaction(signo, &sa, nullptr);
|
||||
throw_system_error_on(r == -1);
|
||||
}
|
||||
|
||||
void
|
||||
reactor::signals::handle_signal(int signo, std::function<void ()>&& handler) {
|
||||
_signal_handlers.emplace(std::piecewise_construct,
|
||||
std::make_tuple(signo), std::make_tuple(signo, std::move(handler)));
|
||||
}
|
||||
|
||||
void
|
||||
reactor::signals::handle_signal_once(int signo, std::function<void ()>&& handler) {
|
||||
return handle_signal(signo, [fired = false, handler = std::move(handler)] () mutable {
|
||||
if (!fired) {
|
||||
fired = true;
|
||||
handler();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool reactor::signals::poll_signal() {
|
||||
auto signals = _pending_signals.load(std::memory_order_relaxed);
|
||||
if (signals) {
|
||||
_pending_signals.fetch_and(~signals, std::memory_order_relaxed);
|
||||
for (size_t i = 0; i < sizeof(signals)*8; i++) {
|
||||
if (signals & (1ull << i)) {
|
||||
_signal_handlers.at(i)._handler();
|
||||
}
|
||||
}
|
||||
}
|
||||
return signals;
|
||||
}
|
||||
|
||||
void reactor::signals::action(int signo, siginfo_t* siginfo, void* ignore) {
|
||||
engine()._signals._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
reactor::reactor()
|
||||
: _backend()
|
||||
, _exit_future(_exit_promise.get_future())
|
||||
, _cpu_started(0)
|
||||
, _io_context(0)
|
||||
, _io_context_available(max_aio)
|
||||
, _reuseport(posix_reuseport_detect())
|
||||
, _pending_signals(0) {
|
||||
, _reuseport(posix_reuseport_detect()) {
|
||||
|
||||
auto r = ::io_setup(max_aio, &_io_context);
|
||||
assert(r >= 0);
|
||||
struct sigevent sev;
|
||||
@@ -575,51 +630,6 @@ void reactor::exit(int ret) {
|
||||
smp::submit_to(0, [this, ret] { _return = ret; stop(); });
|
||||
}
|
||||
|
||||
void
|
||||
reactor::handle_signal(int signo, std::function<void ()>&& handler) {
|
||||
_signal_handlers.emplace(std::piecewise_construct,
|
||||
std::make_tuple(signo), std::make_tuple(signo, std::move(handler)));
|
||||
}
|
||||
|
||||
void
|
||||
reactor::handle_signal_once(int signo, std::function<void ()>&& handler) {
|
||||
return handle_signal(signo, [fired = false, handler = std::move(handler)] () mutable {
|
||||
if (!fired) {
|
||||
fired = true;
|
||||
handler();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void sigaction(int signo, siginfo_t* siginfo, void* ignore) {
|
||||
engine()._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
bool reactor::poll_signal() {
|
||||
auto signals = _pending_signals.load(std::memory_order_relaxed);
|
||||
if (signals) {
|
||||
_pending_signals.fetch_and(~signals, std::memory_order_relaxed);
|
||||
for (size_t i = 0; i < sizeof(signals)*8; i++) {
|
||||
if (signals & (1ull << i)) {
|
||||
_signal_handlers.at(i)._handler();
|
||||
}
|
||||
}
|
||||
}
|
||||
return signals;
|
||||
}
|
||||
|
||||
reactor::signal_handler::signal_handler(int signo, std::function<void ()>&& handler)
|
||||
: _handler(std::move(handler)) {
|
||||
auto mask = make_sigset_mask(signo);
|
||||
auto r = ::sigprocmask(SIG_UNBLOCK, &mask, NULL);
|
||||
throw_system_error_on(r == -1);
|
||||
struct sigaction sa;
|
||||
sa.sa_sigaction = sigaction;
|
||||
sa.sa_mask = make_empty_sigset_mask();
|
||||
sa.sa_flags = SA_SIGINFO | SA_RESTART;
|
||||
r = ::sigaction(signo, &sa, nullptr);
|
||||
throw_system_error_on(r == -1);
|
||||
}
|
||||
|
||||
struct reactor::collectd_registrations {
|
||||
std::vector<scollectd::registration> regs;
|
||||
@@ -707,13 +717,13 @@ int reactor::run() {
|
||||
poller io_poller([&] { return process_io(); });
|
||||
#endif
|
||||
|
||||
poller sig_poller([&] { return poll_signal(); } );
|
||||
poller sig_poller([&] { return _signals.poll_signal(); } );
|
||||
|
||||
if (_id == 0) {
|
||||
if (_handle_sigint) {
|
||||
handle_signal_once(SIGINT, [this] { stop(); });
|
||||
_signals.handle_signal_once(SIGINT, [this] { stop(); });
|
||||
}
|
||||
handle_signal_once(SIGTERM, [this] { stop(); });
|
||||
_signals.handle_signal_once(SIGTERM, [this] { stop(); });
|
||||
}
|
||||
|
||||
_cpu_started.wait(smp::count).then([this] {
|
||||
@@ -736,7 +746,7 @@ int reactor::run() {
|
||||
smp_poller = poller(smp::poll_queues);
|
||||
}
|
||||
|
||||
handle_signal(SIGALRM, [this] {
|
||||
_signals.handle_signal(SIGALRM, [this] {
|
||||
complete_timers(_timers, _expired_timers, [this] {
|
||||
if (!_timers.empty()) {
|
||||
enable_timer(_timers.get_next_timeout());
|
||||
@@ -1106,7 +1116,7 @@ void smp_message_queue::start(unsigned cpuid) {
|
||||
/* not yet implemented for OSv. TODO: do the notification like we do class smp. */
|
||||
#ifndef HAVE_OSV
|
||||
thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) {
|
||||
engine().handle_signal(SIGUSR1, [this] { inter_thread_wq.complete(); });
|
||||
engine()._signals.handle_signal(SIGUSR1, [this] { inter_thread_wq.complete(); });
|
||||
}
|
||||
|
||||
void thread_pool::work() {
|
||||
|
||||
@@ -688,16 +688,28 @@ private:
|
||||
template <typename Func> // signature: bool ()
|
||||
static std::unique_ptr<pollfn> make_pollfn(Func&& func);
|
||||
|
||||
struct signal_handler {
|
||||
signal_handler(int signo, std::function<void ()>&& handler);
|
||||
std::function<void ()> _handler;
|
||||
};
|
||||
std::atomic<uint64_t> _pending_signals;
|
||||
std::unordered_map<int, signal_handler> _signal_handlers;
|
||||
bool poll_signal();
|
||||
friend void sigaction(int signo, siginfo_t* siginfo, void* ignore);
|
||||
class signals {
|
||||
public:
|
||||
signals();
|
||||
~signals();
|
||||
|
||||
bool poll_signal();
|
||||
void handle_signal(int signo, std::function<void ()>&& handler);
|
||||
void handle_signal_once(int signo, std::function<void ()>&& handler);
|
||||
static void action(int signo, siginfo_t* siginfo, void* ignore);
|
||||
|
||||
private:
|
||||
struct signal_handler {
|
||||
signal_handler(int signo, std::function<void ()>&& handler);
|
||||
std::function<void ()> _handler;
|
||||
};
|
||||
std::atomic<uint64_t> _pending_signals;
|
||||
std::unordered_map<int, signal_handler> _signal_handlers;
|
||||
};
|
||||
|
||||
signals _signals;
|
||||
thread_pool _thread_pool;
|
||||
friend thread_pool;
|
||||
|
||||
void run_tasks(circular_buffer<std::unique_ptr<task>>& tasks, size_t task_quota);
|
||||
bool posix_reuseport_detect();
|
||||
@@ -744,9 +756,6 @@ public:
|
||||
template <typename Func>
|
||||
future<io_event> submit_io(Func prepare_io);
|
||||
|
||||
void handle_signal(int signo, std::function<void ()>&& handler);
|
||||
void handle_signal_once(int signo, std::function<void ()>&& handler);
|
||||
|
||||
int run();
|
||||
void exit(int ret);
|
||||
future<> when_started() { return _start_promise.get_future(); }
|
||||
|
||||
Reference in New Issue
Block a user