Files
scylladb/test/raft/generator.hh
Avi Kivity f3eade2f62 treewide: relicense to ScyllaDB-Source-Available-1.0
Drop the AGPL license in favor of a source-available license.
See the blog post [1] for details.

[1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
2024-12-18 17:45:13 +02:00

860 lines
33 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <unordered_set>
#include <random>
#include <boost/mp11/algorithm.hpp>
#include <boost/implicit_cast.hpp>
#include <fmt/std.h>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/variant_utils.hh>
#include "utils/assert.hh"
#include "utils/chunked_vector.hh"
#include "test/raft/future_set.hh"
namespace operation {
// Operation execution thread identifier.
// See `Executable` context.
struct thread_id {
bool operator==(const thread_id&) const = default;
size_t id;
};
} // namespace operation
namespace std {
template<> struct hash<operation::thread_id> {
size_t operator()(const operation::thread_id& tid) const {
return hash<size_t>()(tid.id);
}
};
} // namespace std
namespace operation {
using thread_set = std::unordered_set<thread_id>;
thread_id some(const thread_set& s) {
SCYLLA_ASSERT(!s.empty());
return *s.begin();
}
template <size_t... I>
auto take_impl(const std::vector<thread_id>& vec, std::index_sequence<I...>) {
return std::make_tuple(vec[I]...);
}
template <size_t N>
auto take(const thread_set& s) {
SCYLLA_ASSERT(N <= s.size());
auto end = s.begin();
std::advance(end, N);
std::vector<thread_id> vec{s.begin(), end};
return take_impl(vec, std::make_index_sequence<N>{});
}
// Make a set of `n` threads.
thread_set make_thread_set(size_t n) {
thread_set s;
for (size_t i = 0; i < n; ++i) {
s.insert(thread_id{i});
}
return s;
}
// Operation execution context.
// See `Executable` concept.
struct context {
thread_id thread;
raft::logical_clock::time_point start;
};
// An executable operation.
// Represents a computation which may cause side effects.
//
// The computation is performed through the `execute` function. It returns a result
// of type `result_type` (for computations that don't have a result, use `std::monostate` or equivalent).
//
// Different computations of the same type may share state of type `state_type`; for example,
// the state may contain database handles. A reference to the state is passed to `execute`.
//
// Each execution is performed on an abstract `thread' (represented by a `thread_id`)
// and has a logical start time point. The thread and start point together form
// the execution's `context` which is also passed as a reference to `execute`.
//
// Two operations may be called in parallel only if they are on different threads.
template <typename Op>
concept Executable = requires (Op o, typename Op::state_type& s, const context& ctx) {
typename Op::result_type;
typename Op::state_type;
{ o.execute(s, ctx) } -> std::same_as<future<typename Op::result_type>>;
};
// An operation which can specify a logical point in time when it's ready.
// We will say that such operations `understand time'.
template <typename Op>
concept HasReadyTime = requires(Op o) {
{ o.ready } -> std::convertible_to<std::optional<raft::logical_clock::time_point>>;
};
// An operation which can specify a specific thread on which it can be executed.
// We will say that such operations `understand threads`.
// If the operation specifies a thread, it won't be executed on any other thread.
template <typename Op>
concept HasThread = requires(Op o) {
{ o.thread } -> std::convertible_to<std::optional<thread_id>>;
};
// An operation which finished with an unexpected / unhandled exception.
// Generally it is recommended to handle any failures in the operation's `execute`
// function and return them in the result.
template <typename Op>
struct exceptional_result {
Op op;
std::exception_ptr eptr;
};
template <Executable Op>
using execute_result = std::variant<typename Op::result_type, exceptional_result<Op>>;
// A completion of an operation's execution.
// Contains the result, the logical time point when the operation finished
// and the thread on which the operation was executed.
template <Executable Op>
struct completion {
execute_result<Op> result;
raft::logical_clock::time_point time;
thread_id thread;
};
template <typename Op>
concept Invocable = Executable<Op> && HasReadyTime<Op> && HasThread<Op>;
} // namespace operation
namespace generator {
// A generator run context.
// See `Generator` concept.
struct context {
raft::logical_clock::time_point now;
// free_threads must be a subset of all_threads
const operation::thread_set& all_threads;
const operation::thread_set& free_threads;
};
struct pending {};
struct finished {};
// Possible results for requesting an operation from a generator.
// See `Generator` concept.
//
// A successful fetch results in an operation (`Op`).
// The generator may also return `pending`, denoting that it doesn't have an operation
// ready yet but may in the future (e.g. when the time advances or the set of free threads changes),
// or `finished`, denoting that it has no more operations to return.
//
// Note: the order of types in the variant is part of the interface, i.e. the user may depend on it.
template <typename Op>
using op_ret = std::variant<Op, pending, finished>;
template <typename Op, typename G>
using op_and_gen = std::pair<op_ret<Op>, G>;
// A generator is a data structure that produces a sequence of operations.
// Fetch an operation using the `fetch_op` function.
// The type of produced operations is `operation_type`.
//
// The returned operation depends on the generator's state and a context provided
// when fetching the operation. The context contains:
// - A logical time point representing the current time; the time point should
// be non-decreasing in subsequent `fetch_op` calls.
// - A set of threads used to execute the fetched operations.
// - A subset of that set containing the free threads, i.e. threads that are currently
// not executing any operations.
//
// `fetch_op` may assume that there's at least one free thread;
// the user should not call `fetch_op` otherwise.
//
// Other than an operation, the `fetch_op` function may also return `pending` or `finished`.
// See comment on `op_ret` above for their meaning.
//
// Generators are purely functional data structures. When an operation is fetched,
// it doesn't change the generator's state; instead, it returns a new generator
// whose state represents the modified state after fetching the operation.
// This allows one to easily compose generators. The user of a generator may
// also decide not to use a fetched operation, but instead call `fetch_op` again later
// on the same generator when the situation changes (e.g. there's a new free thread).
//
// TODO: consider using `seastar::lazy_eval` or equivalent for the returned generator.
template <typename G>
concept Generator = requires(const G g, const context& ctx) {
typename G::operation_type;
// Fetch an operation.
//
// The implementation should be deterministic
// (it should produce the same operation given the same generator and context).
{ g.fetch_op(ctx) } -> std::same_as<op_and_gen<typename G::operation_type, G>>;
};
template <typename G, typename Op>
concept GeneratorOf = Generator<G> && std::is_same_v<typename G::operation_type, Op>;
// Runs the provided generator, fetching operations from it and executing them on the provided
// set of threads until the generator is exhausted (returns `finished`).
//
// The interpreter assumes an externally ticked `logical_timer`. The time returned from the timer
// can be used by the interpreted generator. For example, the generator may decide not to return
// an operation until a later time point.
//
// The operation type is an `Invocable` meaning that the generator can specify through the operation's
// fields the logical time point when the operation is ready (see `HasReadyTime`) or the thread
// on which the operation can execute (see `HasThread`).
//
// The interpreter guarantees the generator the following:
// - Subsequent calls to `fetch_op` have non-decreasing values of `context::now`.
// - `context::all_threads` is a constant non-empty set.
// - `context::free_threads` is a non-empty subset of `context::all_threads`.
//
// The interpreter requires the following from the generator:
// - If the generator returns `pending`, then given a context satisfying `free_threads = all_threads`
// and large enough `now` it must return an operation.
// - If the returned operation specifies a thread (the `thread` field is not `nullopt`),
// the thread must be an element of `free_threads` that was provided in the context passed to `fetch_op`.
// - If the returned operation specifies a ready time (the `ready` field is not `nullopt`),
// the time must be greater than `now` that was provided in the context passed to `fetch_op`.
//
// Given an operation `Op op`, interpreter guarantees the following to `op.execute(s, ctx)`
// for `Op::state_type& s` and `operation::context ctx`:
// - `op.execute` is called only once.
// - `s` refers to the same object for all `execute` calls (for different operations).
// - If `op.thread` was not `nullopt`, then `ctx.thread = *op.thread`.
// - If `op.ready` was not `nullopt`, then `ctx.start >= *op.ready`.
// - Two executions with the same `ctx.thread` are sequential
// (the future returned by one of them resolved before the other has started).
//
// The interpreter requires `op.execute` to finish eventually.
//
// If the generator returns an operation which does not specify `ready` or the specified `ready`
// satisfies `*ready <= ctx.now`, the interpreter calls `execute` immediately
// (there is no reactor yield in between). Note: by the above requirements, since the operation
// was fetched, at least one thread is free; if the operation specified a `thread`,
// it belongs to the set of free threads; thus it is possible to execute the operation.
//
// If the returned `pending` or an operation which specified a `*ready > ctx.now`,
// the interpreter will retry the fetch later.
//
// Given an operation `op`, before calling `execute`, the interpreter records the operation
// by calling the given `Recorder`. After `execute` finishes, the interpreter records
// the corresponding completion.
//
// When the interpreter run finishes, it is guaranteed that:
// - all operation executions have completed,
// - the generator has returned `finished`,
// - every recorded operation has a corresponding recorded completion.
template <operation::Invocable Op, GeneratorOf<Op> G, typename Recorder>
requires std::invocable<Recorder, Op> && std::invocable<Recorder, operation::completion<Op>>
class interpreter {
G _generator;
const operation::thread_set _all_threads;
operation::thread_set _free_threads; // a subset of _all_threads
raft::logical_clock::duration _poll_timeout{0};
const raft::logical_clock::duration _max_pending_interval;
future_set<std::pair<operation::execute_result<Op>, operation::thread_id>> _invocations;
typename Op::state_type _op_state;
logical_timer& _timer;
Recorder _record;
public:
// Create an interpreter for the given generator, set of threads, and initial state.
// Use `timer` as a source of logical time.
//
// If the generator returns `pending`, the interpreter will wait for operation completions
// for at most `max_pending_interval` before trying to fetch an operation again.
// It may retry the fetch earlier if it manages to get a completion in the meantime.
//
// Operation invocations and completions are recorded using the `record` function.
interpreter(G gen, operation::thread_set threads, raft::logical_clock::duration max_pending_interval,
typename Op::state_type init_op_state, logical_timer& timer, Recorder record)
: _generator(std::move(gen))
, _all_threads(threads)
, _free_threads(std::move(threads))
, _max_pending_interval(max_pending_interval)
, _op_state(std::move(init_op_state))
, _timer(timer)
, _record(std::move(record))
{
SCYLLA_ASSERT(!_all_threads.empty());
SCYLLA_ASSERT(_max_pending_interval > raft::logical_clock::duration{0});
}
// Run the interpreter and record all operation invocations and completions.
// Can only be executed once.
future<> run() {
while (true) {
// If there are any completions we want to record them as soon as possible
// so we don't introduce false concurrency for anomaly analyzers
// (concurrency makes the problem of looking for anomalies harder).
if (auto r = co_await _invocations.poll(_timer, _poll_timeout)) {
auto [res, tid] = std::move(*r);
SCYLLA_ASSERT(_all_threads.contains(tid));
SCYLLA_ASSERT(!_free_threads.contains(tid));
_free_threads.insert(tid);
_record(operation::completion<Op> {
.result = std::move(res),
.time = _timer.now(),
.thread = tid
});
// In the next iteration poll synchronously so if there will be no more completions
// to immediately fetch, we move on to the next generator operation.
_poll_timeout = raft::logical_clock::duration{0};
continue;
}
// No completions to handle at this time.
if (_free_threads.empty()) {
// No point in fetching an operation from the generator even if there is any;
// there is no thread to execute the operation at this time.
_poll_timeout = _max_pending_interval;
continue;
}
// There is a free thread, check the generator.
auto now = _timer.now();
auto [op, next_gen] = _generator.fetch_op(context {
.now = now,
.all_threads = _all_threads,
.free_threads = _free_threads,
});
bool stop = false;
std::visit(make_visitor(
[&stop] (finished) {
stop = true;
},
[this] (pending) {
_poll_timeout = _max_pending_interval;
},
[this, now, next_gen = std::move(next_gen)] (Op op) mutable {
if (!op.ready) {
op.ready = now;
}
if (now < *op.ready) {
_poll_timeout = *op.ready - now;
} else {
if (!op.thread) {
// We ensured !_free_threads.empty() before fetching the operation.
op.thread = some(_free_threads);
}
SCYLLA_ASSERT(_free_threads.contains(*op.thread));
_free_threads.erase(*op.thread);
_record(op);
operation::context ctx { .thread = *op.thread, .start = now};
_invocations.add(invoke(std::move(op), _op_state, std::move(ctx)));
_generator = std::move(next_gen);
_poll_timeout = raft::logical_clock::duration{0};
}
}
), std::move(op));
if (stop) {
co_await exit();
co_return;
}
}
}
~interpreter() {
// Ensured by `exit()`.
SCYLLA_ASSERT(_invocations.empty());
}
private:
future<> exit() {
auto fs = _invocations.release();
co_await coroutine::parallel_for_each(fs, [this] (future<std::pair<operation::execute_result<Op>, operation::thread_id>>& f) -> future<> {
auto [res, tid] = co_await std::move(f);
_record(operation::completion<Op>{
.result = std::move(res),
.time = _timer.now(),
.thread = tid
});
});
}
// Not a static method nor free function due to clang bug #50345.
future<std::pair<operation::execute_result<Op>, operation::thread_id>>
invoke(Op op, typename Op::state_type& op_state, operation::context ctx) {
try {
co_return std::pair{operation::execute_result<Op>{co_await op.execute(op_state, ctx)}, ctx.thread};
// TODO: consider failing in case of unknown/unexpected exception instead of storing it
} catch (...) {
co_return std::pair{operation::execute_result<Op>{
operation::exceptional_result<Op>{std::move(op), std::current_exception()}},
ctx.thread};
}
}
};
} // namespace generator
namespace operation {
// Given an Executable operation type create an Invocable operation type.
template <Executable Op>
struct invocable : public Op {
using typename Op::result_type;
using typename Op::state_type;
using Op::execute;
using Op::Op;
std::optional<raft::logical_clock::time_point> ready;
std::optional<thread_id> thread;
};
// Given a non-empty set of Executable operation types, create an Executable operation type representing their union (sum type).
//
// The state type of the union is a product of the original state types,
// i.e. the state of the union contains the states of each operation.
// The result type is a union of the original result types.
//
// An operation of this type is an operation of one of the original types.
// The state provided to `execute` is a projection of the product state
// onto the state type corresponding to the original operation type.
template <Executable... Ops>
struct either_of {
static_assert((std::is_same_v<Ops, Ops> || ...)); // pack is non-empty
std::variant<Ops...> op;
using result_type = std::variant<typename Ops::result_type...>;
using state_type = std::tuple<typename Ops::state_type...>;
future<result_type> execute(state_type& s, const context& ctx) {
// `co_return co_await` instead of simply `return` to keep the lambda alive during `execute`
co_return co_await boost::mp11::mp_with_index<std::tuple_size_v<state_type>>(op.index(),
[&s, &ctx, this] (auto I) -> future<result_type> {
co_return result_type{co_await std::get<I>(op).execute(std::get<I>(s), ctx)};
});
}
template <Executable Op>
requires (std::is_same_v<Op, Ops> || ...) // Ops contain Op
either_of(Op o) : op(std::move(o)) {
static_assert(Executable<either_of<Ops...>>);
}
};
} // namespace operation
// Convert a copy/move constructible type to a copy/move assignable type.
template <typename T>
requires std::is_nothrow_move_constructible_v<T>
class assignable {
// Always engaged. Stored in optional for its `emplace` member function.
std::optional<T> _v;
public:
assignable(T v) : _v(std::move(v)) {}
assignable(const assignable&) = default;
assignable(assignable&&) = default;
assignable& operator=(assignable a) {
// nothrow_move_constructible guarantees that this does not throw:
_v.emplace(std::move(*a._v));
return *this;
}
operator T&() { return *_v; }
operator const T&() const { return *_v; }
};
namespace generator {
template <std::invocable<std::mt19937&> F>
requires std::is_nothrow_move_constructible_v<F>
struct random_gen {
using operation_type = std::invoke_result_t<F, std::mt19937&>;
op_and_gen<operation_type, random_gen> fetch_op(const context&) const {
auto e = _engine;
auto r = boost::implicit_cast<F>(_f)(e);
return {std::move(r), random_gen{std::move(e), _f}};
}
std::mt19937 _engine;
assignable<F> _f;
};
// Given an operation factory that uses a pseudo-random number generator to produce an operation,
// creates a generator that produces an infinite random sequence of operations from the factory.
template <std::invocable<std::mt19937&> F>
requires std::is_nothrow_move_constructible_v<F>
random_gen<F> random(int seed, F f) {
static_assert(GeneratorOf<random_gen<F>, std::invoke_result_t<F, std::mt19937&>>);
return random_gen<F>{std::mt19937{seed}, std::move(f)};
}
template <std::invocable<> F>
requires std::is_nothrow_move_constructible_v<F>
struct constant_gen {
using operation_type = std::invoke_result_t<F>;
op_and_gen<operation_type, constant_gen> fetch_op(const context&) const {
return {boost::implicit_cast<F>(_f)(), *this};
}
assignable<F> _f;
};
// Given an operation factory of no arguments,
// creates a generator that produces an infinite constant sequence of operations.
template <std::invocable<> F>
requires std::is_nothrow_move_constructible_v<F>
constant_gen<F> constant(F f) {
static_assert(GeneratorOf<constant_gen<F>, std::invoke_result_t<F>>);
return constant_gen<F>{std::move(f)};
}
template <std::invocable<int32_t> F>
requires std::is_nothrow_move_constructible_v<F>
struct sequence_gen {
using operation_type = std::invoke_result_t<F, int32_t>;
op_and_gen<operation_type, sequence_gen> fetch_op(const context&) const {
return {boost::implicit_cast<F>(_f)(_next), sequence_gen{_next + 1, _f}};
}
int32_t _next;
assignable<F> _f;
};
// Given an operation factory using an integer to produce an operation,
// creates a generator that produces an infinite sequence of operations from the factory
// by passing consecutive integers starting from `start`.
template <std::invocable<int32_t> F>
requires std::is_nothrow_move_constructible_v<F>
sequence_gen<F> sequence(int32_t start, F f) {
static_assert(GeneratorOf<sequence_gen<F>, std::invoke_result_t<F, int32_t>>);
return sequence_gen<F>{start, std::move(f)};
}
template <Generator G>
struct op_limit_gen {
using operation_type = typename G::operation_type;
op_and_gen<operation_type, op_limit_gen> fetch_op(const context& ctx) const {
if (!_remaining) {
return {finished{}, *this};
}
auto [op, new_g] = _g.fetch_op(ctx);
return {std::move(op), op_limit_gen{_remaining - 1, std::move(new_g)}};
}
size_t _remaining;
G _g;
};
// Given a generator and an operation number `limit`,
// creates a generator which truncates the sequence of operations returned by the original generator
// to the first `limit` operations.
// If the original generator finishes earlier, `op_limit` has no observable effect.
template <Generator G>
op_limit_gen<G> op_limit(size_t limit, G g) {
static_assert(GeneratorOf<op_limit_gen<G>, typename G::operation_type>);
return op_limit_gen<G>{limit, std::move(g)};
}
template <Generator G, std::predicate<operation::thread_id> P>
requires operation::HasThread<typename G::operation_type>
struct on_threads_gen {
using operation_type = typename G::operation_type;
op_and_gen<operation_type, on_threads_gen> fetch_op(const context& ctx) const {
operation::thread_set masked_all_threads;
masked_all_threads.reserve(ctx.all_threads.size());
for (auto& t : ctx.all_threads) {
if (_p(t)) {
masked_all_threads.insert(t);
}
}
if (masked_all_threads.empty()) {
return {finished{}, *this};
}
operation::thread_set masked_free_threads;
masked_free_threads.reserve(ctx.free_threads.size());
for (auto& t : ctx.free_threads) {
if (_p(t)) {
masked_free_threads.insert(t);
}
}
if (masked_free_threads.empty()) {
return {pending{}, *this};
}
auto [op, new_g] = _g.fetch_op(context {
.now = ctx.now,
.all_threads = masked_all_threads,
.free_threads = masked_free_threads
});
if (auto i = std::get_if<operation_type>(&op)) {
if (i->thread) {
SCYLLA_ASSERT(masked_free_threads.contains(*i->thread));
} else {
// The underlying generator didn't assign a thread so we do it.
i->thread = some(masked_free_threads);
}
}
return {std::move(op), on_threads_gen{_p, std::move(new_g)}};
}
P _p;
G _g;
};
// Given a generator whose operations understand threads and a predicate on threads,
// creates a generator which produces operations which always specify a thread
// and the specified threads always satisfy the predicate.
//
// Finishes when the original generator finishes or no thread satisfies the predicate.
// If some thread satisfies the predicate but no thread satisfying the predicate
// is currently free, returns `pending`.
//
// The underlying generator must respect `free_threads`, i.e. it must satisfy the following:
// if the returned operation specifies a thread, the thread must be one of the `free_threads` passed in the context.
template <Generator G, std::predicate<operation::thread_id> P>
requires operation::HasThread<typename G::operation_type>
on_threads_gen<G, P> on_threads(P p, G g) {
static_assert(GeneratorOf<on_threads_gen<G, P>, typename G::operation_type>);
return on_threads_gen<G, P>{std::move(p), std::move(g)};
}
// Compare two generator return values for which one is ``sooner''.
// Operations are sooner than `pending`, `pending` is sooner than `finished`.
// For two operations which both specify a ready time, their ready times are compared.
// If one of them doesn't specify a ready time, we assume it is to be executed ``as soon as possible'',
// so we assume that its ready time is the current time for comparison purposes (passed in `now`).
//
// The return type is `weak_ordering` where smaller represents sooner.
template <typename Op>
requires operation::HasReadyTime<Op>
std::weak_ordering sooner(const op_ret<Op>& r1, const op_ret<Op>& r2, raft::logical_clock::time_point now) {
auto op1 = std::get_if<Op>(&r1);
auto op2 = std::get_if<Op>(&r2);
if (op1 && op2) {
auto t1 = op1->ready ? *op1->ready : now;
auto t2 = op2->ready ? *op2->ready : now;
return t1 <=> t2;
}
// one or both of them is not an operation (it's pending or finished)
// operation is sooner than pending which is sooner than finished
// since operation index = 0, pending index = 1, finished index = 2, this works:
return r1.index() <=> r2.index();
}
template <Generator G1, Generator G2>
requires std::is_same_v<typename G1::operation_type, typename G2::operation_type> && operation::HasReadyTime<typename G1::operation_type>
struct either_gen {
using operation_type = typename G1::operation_type;
op_and_gen<operation_type, either_gen> fetch_op(const context& ctx) const {
auto [op1, new_g1] = _g1.fetch_op(ctx);
auto [op2, new_g2] = _g2.fetch_op(ctx);
auto ord = sooner(op1, op2, ctx.now);
bool new_use_g1_on_tie = ord == std::weak_ordering::equivalent ? !_use_g1_on_tie : _use_g1_on_tie;
if ((ord == std::weak_ordering::equivalent && _use_g1_on_tie) || ord == std::weak_ordering::less) {
return {std::move(op1), either_gen{new_use_g1_on_tie, std::move(new_g1), _g2}};
}
return {std::move(op2), either_gen{new_use_g1_on_tie, _g1, std::move(new_g2)}};
}
// Which generator to use when a tie is detected
bool _use_g1_on_tie = true;
G1 _g1;
G2 _g2;
};
// Given two generators producing the same type of operations which understand time,
// creates a generator whose each produced operation comes from one of the underlying generators.
//
// Between operations produced by both generators, chooses the one which is `sooner`.
// In particular finishes when both generators finish.
//
// On the first tie takes an operation from `g1`. On the second and subsequent ties
// takes an operation from the generator which was not used on the previous tie.
template <Generator G1, Generator G2>
requires std::is_same_v<typename G1::operation_type, typename G2::operation_type>
&& operation::HasReadyTime<typename G1::operation_type>
either_gen<G1, G2> either(G1 g1, G2 g2) {
static_assert(GeneratorOf<either_gen<G1, G2>, typename G1::operation_type>);
return either_gen<G1, G2>{._g1{std::move(g1)}, ._g2{std::move(g2)}};
}
// Given a thread and two generators producing the same type of operations which understand threads and time,
// creates a generator whose each produced operation comes from one of the underlying generators;
// furthermore, ensures that all operations from `g1` are assigned to the given thread while all operations
// from `g2` are assigned to other threads.
//
// Ties are resolved as in `either`.
//
// Assumes that underlying generators respect `free_threads` (as in `on_threads`).
template <Generator G1, Generator G2>
requires std::is_same_v<typename G1::operation_type, typename G2::operation_type>
&& operation::HasReadyTime<typename G1::operation_type>
&& operation::HasThread<typename G1::operation_type>
GeneratorOf<typename G1::operation_type> auto pin(operation::thread_id to, G1 g1, G2 g2) {
// Not using lambda because we need the copy constructor
struct on_pinned_t {
operation::thread_id _to;
bool operator()(const operation::thread_id& tid) const { return tid == _to; };
} on_pinned {to};
// Not using std::not_fn for the same reason
struct out_of_pinned_t {
operation::thread_id _to;
bool operator()(const operation::thread_id& tid) const { return tid != _to; };
} out_of_pinned {to};
auto gen = either(
on_threads(on_pinned, std::move(g1)),
on_threads(out_of_pinned, std::move(g2))
);
static_assert(GeneratorOf<decltype(gen), typename G1::operation_type>);
return gen;
}
template <Generator G>
requires operation::HasReadyTime<typename G::operation_type>
struct stagger_gen {
using operation_type = typename G::operation_type;
using distribution_type = std::uniform_int_distribution<raft::logical_clock::rep>;
op_and_gen<operation_type, stagger_gen> fetch_op(const context& ctx) const {
auto [res, new_g] = _g.fetch_op(ctx);
if (auto op = std::get_if<operation_type>(&res)) {
if (!op->ready || op->ready < _next_ready) {
// Need to delay the operation.
op->ready = _next_ready;
}
auto e = _engine;
distribution_type delta{_delta.param()};
auto next_ready = _next_ready + raft::logical_clock::duration{delta(e)};
return {std::move(res), stagger_gen{
std::move(e), delta,
next_ready, std::move(new_g)}
};
}
// pending or finished, nothing to do
return {std::move(res), *this};
}
std::mt19937 _engine;
distribution_type _delta;
raft::logical_clock::time_point _next_ready;
G _g;
};
// Given a generator whose operations understand time,
// a logical `start` time point, and an interval of logical time durations [`delta_low`, `delta_high`],
// produces a sequence of time points where each pair of subsequent time points differs by a time duration
// chosen in random uniformly from the [`delta_low`, `delta_high`] interval;
// then, for each operation in the sequence of operations produced by the underlying generator, modifies its `ready`
// time to be at least as late as the corresponding time point in the above sequence.
//
// For example, if we randomly choose the sequence of time points [10, 20, 30], and the `ready` times specified
// by operations from the underlying generator are [5, `nullopt`, 34], the sequence of operations produced
// by the new generator will be [10, 20, 34].
//
// In particular if `delta = delta_low = delta_high` and the underlying operations never specify `ready`,
// the produced operations are scheduled to execute every `delta` ticks.
template <Generator G>
requires operation::HasReadyTime<typename G::operation_type>
stagger_gen<G> stagger(
int seed, raft::logical_clock::time_point start,
raft::logical_clock::duration delta_low, raft::logical_clock::duration delta_high,
G g) {
static_assert(GeneratorOf<stagger_gen<G>, typename G::operation_type>);
return stagger_gen<G>{ ._engine{seed}, ._delta{delta_low.count(), delta_high.count()}, ._next_ready{start}, ._g{std::move(g)} };
}
} // namespace generator
template <operation::Executable... Ops>
struct fmt::formatter<operation::either_of<Ops...>> : fmt::formatter<string_view> {
auto format(operation::either_of<Ops...>& e, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", e.op);
}
};
template <typename Op>
struct fmt::formatter<operation::exceptional_result<Op>> : fmt::formatter<string_view> {
auto format(const operation::exceptional_result<Op>& r, fmt::format_context& ctx) const {
try {
std::rethrow_exception(r.eptr);
} catch (const std::exception& e) {
return fmt::format_to(ctx.out(), "exceptional{{i:{}, ex:{}}}", r.op, e);
}
}
};
template <operation::Executable Op>
struct fmt::formatter<operation::completion<Op>> : fmt::formatter<string_view> {
auto format(const operation::completion<Op>& c, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "c{{r:{}, t:{}, tid:{}}}", c.result, c.time, c.thread);
}
};
template <operation::Executable Op>
struct fmt::formatter<operation::invocable<Op>> : fmt::formatter<string_view> {
auto format(const operation::invocable<Op>& op, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", static_cast<Op>(op));
}
};