Merge branch 'signal' of ../seastar

Simplify signal handling.
This commit is contained in:
Avi Kivity
2015-01-29 10:08:27 +02:00
2 changed files with 43 additions and 75 deletions

View File

@@ -111,12 +111,6 @@ reactor::reactor()
sev.sigev_signo = SIGALRM;
r = timer_create(CLOCK_REALTIME, &sev, &_timer);
assert(r >= 0);
keep_doing([this] {
return receive_signal(SIGALRM).then([this] {
_timer_promise.set_value();
_timer_promise = promise<>();
});
}).or_terminate();
memory::set_reclaim_hook([this] (std::function<void ()> reclaim_fn) {
// push it in the front of the queue so we reclaim memory quickly
_pending_tasks.push_front(make_task([fn = std::move(reclaim_fn)] {
@@ -546,14 +540,10 @@ void reactor::exit(int ret) {
smp::submit_to(0, [this, ret] { _return = ret; stop(); });
}
future<>
reactor::receive_signal(int signo) {
auto i = _signal_handlers.find(signo);
if (i == _signal_handlers.end()) {
i = _signal_handlers.emplace(signo, signo).first;
}
signal_handler& sh = i->second;
return sh._promise.get_future();
void
reactor::handle_signal(int signo, std::function<void ()>&& handler) {
_signal_handlers.emplace(std::piecewise_construct,
std::make_tuple(signo), std::make_tuple(signo, std::move(handler)));
}
void sigaction(int signo, siginfo_t* siginfo, void* ignore) {
@@ -566,15 +556,15 @@ bool reactor::poll_signal() {
_pending_signals.fetch_and(~signals, std::memory_order_relaxed);
for (size_t i = 0; i < sizeof(signals)*8; i++) {
if (signals & (1ull << i)) {
_signal_handlers.at(i)._promise.set_value();
_signal_handlers.at(i)._promise = promise<>();
_signal_handlers.at(i)._handler();
}
}
}
return signals;
}
reactor::signal_handler::signal_handler(int signo) {
reactor::signal_handler::signal_handler(int signo, std::function<void ()>&& handler)
: _handler(std::move(handler)) {
auto mask = make_sigset_mask(signo);
auto r = ::sigprocmask(SIG_UNBLOCK, &mask, NULL);
throw_system_error_on(r == -1);
@@ -676,9 +666,9 @@ int reactor::run() {
if (_id == 0) {
if (_handle_sigint) {
receive_signal(SIGINT).then([this] { stop(); });
handle_signal(SIGINT, [this] { stop(); });
}
receive_signal(SIGTERM).then([this] { stop(); });
handle_signal(SIGTERM, [this] { stop(); });
}
_cpu_started.wait(smp::count).then([this] {
@@ -701,24 +691,13 @@ int reactor::run() {
smp_poller = poller(smp::poll_queues);
}
complete_timers(_timers, _expired_timers,
[this] { return timers_completed(); },
[this] {
handle_signal(SIGALRM, [this] {
complete_timers(_timers, _expired_timers, [this] {
if (!_timers.empty()) {
enable_timer(_timers.get_next_timeout());
}
}
);
complete_timers(_lowres_timers, _expired_lowres_timers,
[this] { return lowres_timers_completed(); },
[this] {
if (!_lowres_timers.empty()) {
_lowres_next_timeout = _lowres_timers.get_next_timeout();
} else {
_lowres_next_timeout = lowres_clock::time_point();
}
}
);
});
});
poller drain_cross_cpu_freelist([] {
return memory::drain_cross_cpu_freelist();
@@ -730,8 +709,13 @@ int reactor::run() {
}
auto now = lowres_clock::now();
if (now > _lowres_next_timeout) {
_lowres_timer_promise.set_value();
_lowres_timer_promise = promise<>();
complete_timers(_lowres_timers, _expired_lowres_timers, [this] {
if (!_lowres_timers.empty()) {
_lowres_next_timeout = _lowres_timers.get_next_timeout();
} else {
_lowres_next_timeout = lowres_clock::time_point();
}
});
return true;
}
return false;
@@ -1074,9 +1058,7 @@ void smp_message_queue::start(unsigned cpuid) {
/* not yet implemented for OSv. TODO: do the notification like we do class smp. */
#ifndef HAVE_OSV
thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) {
keep_doing([this] {
return engine().receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); });
});
engine().handle_signal(SIGUSR1, [this] { inter_thread_wq.complete(); });
}
void thread_pool::work() {

View File

@@ -630,16 +630,14 @@ private:
// _lowres_clock will only be created on cpu 0
std::unique_ptr<lowres_clock> _lowres_clock;
lowres_clock::time_point _lowres_next_timeout;
promise<> _lowres_timer_promise;
promise<> _timer_promise;
std::experimental::optional<poller> _epoll_poller;
const bool _reuseport;
circular_buffer<double> _loads;
double _load = 0;
private:
void abort_on_error(int ret);
template <typename T, typename E>
void complete_timers(T&, E&, std::function<future<> ()>, std::function<void ()>);
template <typename T, typename E, typename EnableFunc>
void complete_timers(T&, E&, EnableFunc&& enable_fn);
/**
* Returns TRUE if all pollers allow blocking.
@@ -652,8 +650,8 @@ private:
static std::unique_ptr<pollfn> make_pollfn(Func&& func);
struct signal_handler {
signal_handler(int signo);
promise<> _promise;
signal_handler(int signo, std::function<void ()>&& handler);
std::function<void ()> _handler;
};
std::atomic<uint64_t> _pending_signals;
std::unordered_map<int, signal_handler> _signal_handlers;
@@ -707,7 +705,7 @@ public:
template <typename Func>
future<io_event> submit_io(Func prepare_io);
future<> receive_signal(int signo);
void handle_signal(int signo, std::function<void ()>&& handler);
int run();
void exit(int ret);
@@ -788,12 +786,6 @@ public:
return _backend.notified(n);
}
void enable_timer(clock_type::time_point when);
future<> timers_completed() {
return _timer_promise.get_future();
}
future<> lowres_timers_completed() {
return _lowres_timer_promise.get_future();
}
std::unique_ptr<reactor_notifier> make_reactor_notifier() {
return _backend.make_reactor_notifier();
}
@@ -1134,31 +1126,25 @@ reactor::write_all(pollable_fd_state& fd, const void* buffer, size_t len) {
return write_all_part(fd, buffer, len, 0);
}
template <typename T, typename E>
void reactor::complete_timers(T& timers, E& expired_timers,
std::function<future<> ()> completed_fn,
std::function<void ()> enable_fn) {
completed_fn().then([this, &timers, &expired_timers, completed_fn,
enable_fn = std::move(enable_fn)] () mutable {
expired_timers = timers.expire(timers.now());
for (auto& t : expired_timers) {
t._expired = true;
}
while (!expired_timers.empty()) {
auto t = &*expired_timers.begin();
expired_timers.pop_front();
t->_queued = false;
if (t->_armed) {
t->_armed = false;
if (t->_period) {
t->arm_periodic(*t->_period);
}
t->_callback();
template <typename T, typename E, typename EnableFunc>
void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) {
expired_timers = timers.expire(timers.now());
for (auto& t : expired_timers) {
t._expired = true;
}
while (!expired_timers.empty()) {
auto t = &*expired_timers.begin();
expired_timers.pop_front();
t->_queued = false;
if (t->_armed) {
t->_armed = false;
if (t->_period) {
t->arm_periodic(*t->_period);
}
t->_callback();
}
enable_fn();
complete_timers(timers, expired_timers, std::move(completed_fn), std::move(enable_fn));
});
}
enable_fn();
}
template <typename CharType>