mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-02 13:06:57 +00:00
reactor: simplify timer handling
Instead of scheduling timer processing to happen in the future, process timers in the context of the poller (signal poller for high resolution timer, lowres time poller for low resolution timers). This both reduces timer latency (not very important) but removes a use of promise/future in a repetitive task, which is error prone.
This commit is contained in:
@@ -112,10 +112,6 @@ reactor::reactor()
|
||||
sev.sigev_signo = SIGALRM;
|
||||
r = timer_create(CLOCK_REALTIME, &sev, &_timer);
|
||||
assert(r >= 0);
|
||||
handle_signal(SIGALRM, [this] {
|
||||
_timer_promise.set_value();
|
||||
_timer_promise = promise<>();
|
||||
});
|
||||
memory::set_reclaim_hook([this] (std::function<void ()> reclaim_fn) {
|
||||
// push it in the front of the queue so we reclaim memory quickly
|
||||
_pending_tasks.push_front(make_task([fn = std::move(reclaim_fn)] {
|
||||
@@ -696,24 +692,13 @@ int reactor::run() {
|
||||
smp_poller = poller(smp::poll_queues);
|
||||
}
|
||||
|
||||
complete_timers(_timers, _expired_timers,
|
||||
[this] { return timers_completed(); },
|
||||
[this] {
|
||||
handle_signal(SIGALRM, [this] {
|
||||
complete_timers(_timers, _expired_timers, [this] {
|
||||
if (!_timers.empty()) {
|
||||
enable_timer(_timers.get_next_timeout());
|
||||
}
|
||||
}
|
||||
);
|
||||
complete_timers(_lowres_timers, _expired_lowres_timers,
|
||||
[this] { return lowres_timers_completed(); },
|
||||
[this] {
|
||||
if (!_lowres_timers.empty()) {
|
||||
_lowres_next_timeout = _lowres_timers.get_next_timeout();
|
||||
} else {
|
||||
_lowres_next_timeout = lowres_clock::time_point();
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
poller drain_cross_cpu_freelist([] {
|
||||
return memory::drain_cross_cpu_freelist();
|
||||
@@ -725,8 +710,13 @@ int reactor::run() {
|
||||
}
|
||||
auto now = lowres_clock::now();
|
||||
if (now > _lowres_next_timeout) {
|
||||
_lowres_timer_promise.set_value();
|
||||
_lowres_timer_promise = promise<>();
|
||||
complete_timers(_lowres_timers, _expired_lowres_timers, [this] {
|
||||
if (!_lowres_timers.empty()) {
|
||||
_lowres_next_timeout = _lowres_timers.get_next_timeout();
|
||||
} else {
|
||||
_lowres_next_timeout = lowres_clock::time_point();
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
@@ -630,16 +630,14 @@ private:
|
||||
// _lowres_clock will only be created on cpu 0
|
||||
std::unique_ptr<lowres_clock> _lowres_clock;
|
||||
lowres_clock::time_point _lowres_next_timeout;
|
||||
promise<> _lowres_timer_promise;
|
||||
promise<> _timer_promise;
|
||||
std::experimental::optional<poller> _epoll_poller;
|
||||
const bool _reuseport;
|
||||
circular_buffer<double> _loads;
|
||||
double _load = 0;
|
||||
private:
|
||||
void abort_on_error(int ret);
|
||||
template <typename T, typename E>
|
||||
void complete_timers(T&, E&, std::function<future<> ()>, std::function<void ()>);
|
||||
template <typename T, typename E, typename EnableFunc>
|
||||
void complete_timers(T&, E&, EnableFunc&& enable_fn);
|
||||
|
||||
/**
|
||||
* Returns TRUE if all pollers allow blocking.
|
||||
@@ -788,12 +786,6 @@ public:
|
||||
return _backend.notified(n);
|
||||
}
|
||||
void enable_timer(clock_type::time_point when);
|
||||
future<> timers_completed() {
|
||||
return _timer_promise.get_future();
|
||||
}
|
||||
future<> lowres_timers_completed() {
|
||||
return _lowres_timer_promise.get_future();
|
||||
}
|
||||
std::unique_ptr<reactor_notifier> make_reactor_notifier() {
|
||||
return _backend.make_reactor_notifier();
|
||||
}
|
||||
@@ -1133,31 +1125,25 @@ reactor::write_all(pollable_fd_state& fd, const void* buffer, size_t len) {
|
||||
return write_all_part(fd, buffer, len, 0);
|
||||
}
|
||||
|
||||
template <typename T, typename E>
|
||||
void reactor::complete_timers(T& timers, E& expired_timers,
|
||||
std::function<future<> ()> completed_fn,
|
||||
std::function<void ()> enable_fn) {
|
||||
completed_fn().then([this, &timers, &expired_timers, completed_fn,
|
||||
enable_fn = std::move(enable_fn)] () mutable {
|
||||
expired_timers = timers.expire(timers.now());
|
||||
for (auto& t : expired_timers) {
|
||||
t._expired = true;
|
||||
}
|
||||
while (!expired_timers.empty()) {
|
||||
auto t = &*expired_timers.begin();
|
||||
expired_timers.pop_front();
|
||||
t->_queued = false;
|
||||
if (t->_armed) {
|
||||
t->_armed = false;
|
||||
if (t->_period) {
|
||||
t->arm_periodic(*t->_period);
|
||||
}
|
||||
t->_callback();
|
||||
template <typename T, typename E, typename EnableFunc>
|
||||
void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) {
|
||||
expired_timers = timers.expire(timers.now());
|
||||
for (auto& t : expired_timers) {
|
||||
t._expired = true;
|
||||
}
|
||||
while (!expired_timers.empty()) {
|
||||
auto t = &*expired_timers.begin();
|
||||
expired_timers.pop_front();
|
||||
t->_queued = false;
|
||||
if (t->_armed) {
|
||||
t->_armed = false;
|
||||
if (t->_period) {
|
||||
t->arm_periodic(*t->_period);
|
||||
}
|
||||
t->_callback();
|
||||
}
|
||||
enable_fn();
|
||||
complete_timers(timers, expired_timers, std::move(completed_fn), std::move(enable_fn));
|
||||
});
|
||||
}
|
||||
enable_fn();
|
||||
}
|
||||
|
||||
template <typename CharType>
|
||||
|
||||
Reference in New Issue
Block a user