Files
scylladb/test/raft/logical_timer.hh
Avi Kivity cd04ab1a4e test: avoid spaces when defining user-defined literal operator
Clang 20 complains when it sees a user-defined literal operator
defined with a space before the underscore. Assume it's adhering
to the standard and comply.

Closes scylladb/scylladb#23401
2025-03-24 10:17:12 +03:00

242 lines
8.2 KiB
C++

/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/timed_out_error.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/sleep.hh>
#include "raft/logical_clock.hh"
#include "raft/raft.hh"
using namespace seastar;
constexpr raft::logical_clock::duration operator ""_t(unsigned long long ticks) {
return raft::logical_clock::duration{ticks};
}
// A wrapper around `raft::logical_clock` that allows scheduling events to happen after a certain number of ticks.
class logical_timer {
struct scheduled_impl {
public:
virtual ~scheduled_impl() { }
virtual void resolve() = 0;
};
struct scheduled {
raft::logical_clock::time_point _at;
seastar::shared_ptr<scheduled_impl> _impl;
void resolve() { _impl->resolve(); }
};
raft::logical_clock _clock;
// A min-heap of `scheduled` events sorted by `_at`.
std::vector<scheduled> _scheduled;
// Comparator for the `_scheduled` min-heap.
static bool cmp(const scheduled& a, const scheduled& b) {
return a._at > b._at;
}
public:
template <typename... T>
class timed_out : public raft::error {
// lw_shared_ptr to make the exception copyable
lw_shared_ptr<seastar::future<T...>> _fut;
public:
timed_out(seastar::future<T...> f) : error("timed out"), _fut(make_lw_shared(std::move(f))) {}
timed_out(const timed_out&) = default;
timed_out(timed_out&&) = default;
seastar::future<T...>& get_future() {
return *_fut;
}
};
logical_timer() = default;
logical_timer(const logical_timer&) = delete;
logical_timer(logical_timer&&) = default;
// Returns the current logical time (number of `tick()`s since initialization).
raft::logical_clock::time_point now() {
return _clock.now();
}
// Tick the internal clock.
// Resolve all events whose scheduled times arrive after that tick.
void tick() {
_clock.advance();
while (!_scheduled.empty() && _scheduled.front()._at <= _clock.now()) {
_scheduled.front().resolve();
std::pop_heap(_scheduled.begin(), _scheduled.end(), cmp);
_scheduled.pop_back();
}
}
// Given a future `f` and a logical time point `tp`, returns a future that resolves
// when either `f` resolves or the time point `tp` arrives (according to the number of `tick()` calls),
// whichever comes first.
//
// If `tp` comes first, the returned future is resolved with the `timed_out` exception.
// The exception contains a future equivalent to the original future (i.e. when the original future
// resolves, the future inside the exception will contain the original future's result).
//
// Note: it is highly recommended to pass in futures that always resolve eventually
// since we attach continuations to these futures, allocating memory.
//
// Note: there is a possibility that the internal heap will grow unbounded if we call `with_timeout`
// more often than we `tick`, so don't do that. It is recommended to call at least one
// `tick` per one `with_timeout` call (on average in the long run).
template <typename... T>
future<T...> with_timeout(raft::logical_clock::time_point tp, future<T...> f) {
if (f.available()) {
return f;
}
if (tp <= now()) {
return make_exception_future<T...>(timed_out<T...>{std::move(f)});
}
struct sched : public scheduled_impl {
// The original future (the `f` argument), when it resolves, will set value on `_p`.
// Before timeout, `_p` is connected to the future returned to the user (`res` below).
// After timeout, `_p` is a new promise connected to the future returned inside the timed_out exception.
// Therefore:
// 1. if `f` resolves before timeout, it will resolve `res`,
// 2. otherwise it will resolve the future we return inside `timed_out` which is returned through `res`.
promise<T...> _p;
bool _resolved = false;
virtual ~sched() override { }
virtual void resolve() override {
if (!_resolved) {
promise<T...> new_p;
_p.set_exception(timed_out<T...>{new_p.get_future()});
std::swap(new_p, _p);
_resolved = true;
}
}
};
auto s = make_shared<sched>();
auto res = s->_p.get_future();
_scheduled.push_back(scheduled{
._at = tp,
._impl = s
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
(void)f.then_wrapped([s = std::move(s)] (future<T...> f) mutable {
// If we're before timeout, `s->_p` is connected to `res` so we're returning the result of `f` to our caller.
// Otherwise `s->_p` is connected to a future which was previously returned to our caller inside `timed_out`.
f.forward_to(std::move(s->_p));
s->_resolved = true;
// tick() will (eventually) clear the `_scheduled` entry
// (unless we're already past timeout, in which case the entry has already been cleared).
});
return res;
}
// Returns a future that resolves at logical time point `tp` (according to this timer's clock).
// Note: analogous remark applies as for `with_timeout`, i.e. make sure to call at least one `tick`
// per one `sleep_until` call on average.
future<> sleep_until(raft::logical_clock::time_point tp) {
return sleep_until(tp, nullptr);
}
// Returns a future that resolves at logical time point `tp` (according to this timer's clock),
// or when `as` is aborted (in which case `sleep_aborted` is thrown).
// Note: see note above.
future<> sleep_until(raft::logical_clock::time_point tp, abort_source& as) {
return sleep_until(tp, &as);
}
// Returns a future that resolves after a number of `tick()`s represented by `d`.
// Example usage: `sleep(20_t)` resolves after 20 `tick()`s.
// Note: see note above.
future<> sleep(raft::logical_clock::duration d) {
return sleep_until(now() + d);
}
// Schedule `f` to be called at logical time point `tp` (according to this timer's clock).
template <typename F>
void schedule(raft::logical_clock::time_point tp, F f) {
if (tp <= now()) {
f();
return;
}
struct sched : public scheduled_impl {
sched(F f) : _f(std::move(f)) {}
virtual ~sched() override {}
virtual void resolve() override { _f(); }
F _f;
};
_scheduled.push_back(scheduled {
._at = tp,
._impl = make_shared<sched>(std::move(f))
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
}
private:
future<> sleep_until(raft::logical_clock::time_point tp, abort_source* as) {
if (tp <= now()) {
co_return;
}
struct sched : public scheduled_impl {
promise<> _p;
bool _resolved = false;
virtual ~sched() override {}
virtual void resolve() override {
if (!_resolved) {
_p.set_value();
_resolved = true;
}
}
};
auto s = make_shared<sched>();
optimized_optional<abort_source::subscription> sub;
if (as) {
sub = as->subscribe([s] () noexcept {
if (!s->_resolved) {
s->_p.set_exception(std::make_exception_ptr(sleep_aborted{}));
s->_resolved = true;
}
});
if (!sub) {
throw sleep_aborted{};
}
}
auto f = s->_p.get_future();
_scheduled.push_back(scheduled{
._at = tp,
._impl = std::move(s)
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
co_await std::move(f);
}
};