Merge "test: raft: randomized_nemesis_test: refactors and improvements" from Kamil

A couple of improvements to prepare for the next patchset.

We move `logical_timer` and `ticker` to their own headers due to the
generality of these data structures. They are not very specific to the
test.

`logical_timer` is extended with a `schedule` function, allowing to
schedule any given function to be called at the given time point.

The interface of `network` in `randomized_nemesis_test` is extended by
`add_grudge` and `remove_grudge` functions for implementing network
partitioning nemeses.
Furthermore `network` can be now constructed with an arbitrary network
delay, which was previously hardcoded.

`with_env_and_ticker` is now generic w.r.t. return values (previously
`future<>` was assumed).

`environment` exposes a reference to the `network` through a getter.

The `not_a_leader` exception now shows the leader's ID in the exception
message. Useful for logging.

In `logical_timer::with_timeout`, when we timeout, we don't just return
`timed_out_error`. The returned exception now actually contains the
original future... well almost; in any case, the user can now do
something different to the future other than simply discarding it.

We also fix some `broken_promise` exceptions appearing in discarded
futures in certain scenarios. See the corresponding commit for detailed
explanation.

We handle `raft::dropped_entry` in the `call` function.

`persistence` is fixed to avoid creating gaps in the log when storing
snapshots and to support complex state types.

Waiting for leader was refactored into a separate function and
generalized (we wait for a set of nodes to elect a leader instead of a
single node to elect itself) to be useful in more situations.

Finally, we introduce `reconfigure`, a higher-level version of
`set_configuration` which performs error handling and supports timeouts.

* kbr/raft-nemesis-improvements-v4:
  test: raft: randomized_nemesis_test: `reconfigure` function
  test: raft: randomized_nemesis_test: refactor waiting for leader into a separate function
  test: raft: randomized_nemesis_test: persistence: avoid creating gaps in the log when storing snapshots
  test: raft: randomized_nemesis_test: persistence: handle complex state types
  test: raft: randomized_nemesis_test: `call`: handle `raft::dropped_entry`
  test: raft: randomized_nemesis_test: impure_state_machine/call: handle dropped channels
  test: raft: randomized_nemesis_test: environment: expose the network
  test: raft: randomized_nemesis_test: configurable network delay and FD convict threshold
  test: raft: randomized_nemesis_test: generalize `with_env_and_ticker`
  test: raft: randomized_nemesis_test: network: `add_grudge`, `remove_grudge` functions
  test: raft: randomized_nemesis_test: move `ticker` to its own header
  test: raft: randomized_nemesis_test: ticker: take `logger` as a constructor parameter
  test: raft: logical_timer: handle immediate timeout
  test: raft: logical_timer: on timeout, return the original future in the exception
  test: raft: logical_timer: add `schedule` member function
  test: raft: randomized_nemesis_test: move `logical_timer` to its own header
  test: raft: include the leader's ID in the `not_a_leader` exception's message
This commit is contained in:
Tomasz Grabiec
2021-07-16 00:59:48 +02:00
4 changed files with 496 additions and 261 deletions

View File

@@ -200,7 +200,7 @@ struct error : public std::runtime_error {
struct not_a_leader : public error {
server_id leader;
explicit not_a_leader(server_id l) : error("Not a leader"), leader(l) {}
explicit not_a_leader(server_id l) : error(format("Not a leader, leader: {}", l)), leader(l) {}
};
struct dropped_entry : public error {

224
test/raft/logical_timer.hh Normal file
View File

@@ -0,0 +1,224 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/timed_out_error.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
#include "raft/logical_clock.hh"
using namespace seastar;
constexpr raft::logical_clock::duration operator "" _t(unsigned long long ticks) {
return raft::logical_clock::duration{ticks};
}
// A wrapper around `raft::logical_clock` that allows scheduling events to happen after a certain number of ticks.
class logical_timer {
struct scheduled_impl {
private:
bool _resolved = false;
virtual void do_resolve() = 0;
public:
virtual ~scheduled_impl() { }
void resolve() {
if (!_resolved) {
do_resolve();
_resolved = true;
}
}
void mark_resolved() { _resolved = true; }
bool resolved() { return _resolved; }
};
struct scheduled {
raft::logical_clock::time_point _at;
seastar::shared_ptr<scheduled_impl> _impl;
void resolve() { _impl->resolve(); }
bool resolved() { return _impl->resolved(); }
};
raft::logical_clock _clock;
// A min-heap of `scheduled` events sorted by `_at`.
std::vector<scheduled> _scheduled;
// Comparator for the `_scheduled` min-heap.
static bool cmp(const scheduled& a, const scheduled& b) {
return a._at > b._at;
}
public:
template <typename... T>
class timed_out : public raft::error {
// lw_shared_ptr to make the exception copyable
lw_shared_ptr<seastar::future<T...>> _fut;
public:
timed_out(seastar::future<T...> f) : error("timed out"), _fut(make_lw_shared(std::move(f))) {}
timed_out(const timed_out&) = default;
timed_out(timed_out&&) = default;
seastar::future<T...>& get_future() {
return *_fut;
}
};
logical_timer() = default;
logical_timer(const logical_timer&) = delete;
logical_timer(logical_timer&&) = default;
// Returns the current logical time (number of `tick()`s since initialization).
raft::logical_clock::time_point now() {
return _clock.now();
}
// Tick the internal clock.
// Resolve all events whose scheduled times arrive after that tick.
void tick() {
_clock.advance();
while (!_scheduled.empty() && _scheduled.front()._at <= _clock.now()) {
_scheduled.front().resolve();
std::pop_heap(_scheduled.begin(), _scheduled.end(), cmp);
_scheduled.pop_back();
}
}
// Given a future `f` and a logical time point `tp`, returns a future that resolves
// when either `f` resolves or the time point `tp` arrives (according to the number of `tick()` calls),
// whichever comes first.
//
// If `tp` comes first, the returned future is resolved with the `timed_out` exception.
// The exception contains a future equivalent to the original future (i.e. when the original future
// resolves, the future inside the exception will contain the original future's result).
//
// Note: it is highly recommended to pass in futures that always resolve eventually
// since we attach continuations to these futures, allocating memory.
//
// Note: there is a possibility that the internal heap will grow unbounded if we call `with_timeout`
// more often than we `tick`, so don't do that. It is recommended to call at least one
// `tick` per one `with_timeout` call (on average in the long run).
template <typename... T>
future<T...> with_timeout(raft::logical_clock::time_point tp, future<T...> f) {
if (f.available()) {
return f;
}
if (tp <= now()) {
return make_exception_future<T...>(timed_out<T...>{std::move(f)});
}
struct sched : public scheduled_impl {
// The original future (the `f` argument), when it resolves, will set value on `_p`.
// Before timeout, `_p` is connected to the future returned to the user (`res` below).
// After timeout, `_p` is a new promise connected to the future returned inside the timed_out exception.
// Therefore:
// 1. if `f` resolves before timeout, it will resolve `res`,
// 2. otherwise it will resolve the future we return inside `timed_out` which is returned through `res`.
promise<T...> _p;
virtual ~sched() override { }
virtual void do_resolve() override {
promise<T...> new_p;
_p.set_exception(timed_out<T...>{new_p.get_future()});
std::swap(new_p, _p);
}
};
auto s = make_shared<sched>();
auto res = s->_p.get_future();
_scheduled.push_back(scheduled{
._at = tp,
._impl = s
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
(void)f.then_wrapped([s = std::move(s)] (future<T...> f) mutable {
// If we're before timeout, `s->_p` is connected to `res` so we're returning the result of `f` to our caller.
// Otherwise `s->_p` is connected to a future which was previously returned to our caller inside `timed_out`.
f.forward_to(std::move(s->_p));
s->mark_resolved();
// tick() will (eventually) clear the `_scheduled` entry
// (unless we're already past timeout, in which case the entry has already been cleared).
});
return res;
}
// Returns a future that resolves after a number of `tick()`s represented by `d`.
// Example usage: `sleep(20_t)` resolves after 20 `tick()`s.
// Note: analogous remark applies as for `with_timeout`, i.e. make sure to call at least one `tick`
// per one `sleep` call on average.
future<> sleep(raft::logical_clock::duration d) {
if (d == raft::logical_clock::duration{0}) {
return make_ready_future<>();
}
struct sched : public scheduled_impl {
promise<> _p;
virtual ~sched() override {}
virtual void do_resolve() override { _p.set_value(); }
};
auto s = make_shared<sched>();
auto f = s->_p.get_future();
_scheduled.push_back(scheduled{
._at = now() + d,
._impl = std::move(s)
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
return f;
}
// Schedule `f` to be called at logical time point `tp` (according to this timer's clock).
template <typename F>
void schedule(raft::logical_clock::time_point tp, F f) {
if (tp <= now()) {
f();
return;
}
struct sched : public scheduled_impl {
sched(F f) : _f(std::move(f)) {}
virtual ~sched() override {}
virtual void do_resolve() override { _f(); }
F _f;
};
_scheduled.push_back(scheduled {
._at = tp,
._impl = make_shared<sched>(std::move(f))
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
}
};

View File

@@ -36,6 +36,9 @@
#include "idl/uuid.dist.hh"
#include "idl/uuid.dist.impl.hh"
#include "test/raft/logical_timer.hh"
#include "test/raft/ticker.hh"
using namespace seastar;
using namespace std::chrono_literals;
@@ -66,145 +69,6 @@ requires (typename M::state_t s, typename M::input_t i) {
requires std::is_same_v<const typename M::state_t, decltype(M::init)>;
};
constexpr raft::logical_clock::duration operator "" _t(unsigned long long ticks) {
return raft::logical_clock::duration{ticks};
}
// A wrapper around `raft::logical_clock` that allows scheduling events to happen after a certain number of ticks.
class logical_timer {
struct scheduled_impl {
private:
bool _resolved = false;
virtual void do_resolve() = 0;
public:
virtual ~scheduled_impl() { }
void resolve() {
if (!_resolved) {
do_resolve();
_resolved = true;
}
}
void mark_resolved() { _resolved = true; }
bool resolved() { return _resolved; }
};
struct scheduled {
raft::logical_clock::time_point _at;
seastar::shared_ptr<scheduled_impl> _impl;
void resolve() { _impl->resolve(); }
bool resolved() { return _impl->resolved(); }
};
raft::logical_clock _clock;
// A min-heap of `scheduled` events sorted by `_at`.
std::vector<scheduled> _scheduled;
// Comparator for the `_scheduled` min-heap.
static bool cmp(const scheduled& a, const scheduled& b) {
return a._at > b._at;
}
public:
logical_timer() = default;
logical_timer(const logical_timer&) = delete;
logical_timer(logical_timer&&) = default;
// Returns the current logical time (number of `tick()`s since initialization).
raft::logical_clock::time_point now() {
return _clock.now();
}
// Tick the internal clock.
// Resolve all events whose scheduled times arrive after that tick.
void tick() {
_clock.advance();
while (!_scheduled.empty() && _scheduled.front()._at <= _clock.now()) {
_scheduled.front().resolve();
std::pop_heap(_scheduled.begin(), _scheduled.end(), cmp);
_scheduled.pop_back();
}
}
// Given a future `f` and a logical time point `tp`, returns a future that resolves
// when either `f` resolves or the time point `tp` arrives (according to the number of `tick()` calls),
// whichever comes first.
//
// Note: if `tp` comes first, that doesn't cancel any in-progress computations behind `f`.
// `f` effectively becomes a discarded future.
// Note: there is a possibility that the internal heap will grow unbounded if we call `with_timeout`
// more often than we `tick`, so don't do that. It is recommended to call at least one
// `tick` per one `with_timeout` call (on average in the long run).
template <typename... T>
future<T...> with_timeout(raft::logical_clock::time_point tp, future<T...> f) {
if (f.available()) {
return f;
}
struct sched : public scheduled_impl {
promise<T...> _p;
virtual ~sched() override { }
virtual void do_resolve() override {
_p.set_exception(std::make_exception_ptr(timed_out_error()));
}
};
auto s = make_shared<sched>();
auto res = s->_p.get_future();
_scheduled.push_back(scheduled{
._at = tp,
._impl = s
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
(void)f.then_wrapped([s = std::move(s)] (auto&& f) mutable {
if (s->resolved()) {
f.ignore_ready_future();
} else {
f.forward_to(std::move(s->_p));
s->mark_resolved();
// tick() will (eventually) clear the `_scheduled` entry.
}
});
return res;
}
// Returns a future that resolves after a number of `tick()`s represented by `d`.
// Example usage: `sleep(20_t)` resolves after 20 `tick()`s.
// Note: analogous remark applies as for `with_timeout`, i.e. make sure to call at least one `tick`
// per one `sleep` call on average.
future<> sleep(raft::logical_clock::duration d) {
if (d == raft::logical_clock::duration{0}) {
return make_ready_future<>();
}
struct sched : public scheduled_impl {
promise<> _p;
virtual ~sched() override {}
virtual void do_resolve() override { _p.set_value(); }
};
auto s = make_shared<sched>();
auto f = s->_p.get_future();
_scheduled.push_back(scheduled{
._at = now() + d,
._impl = std::move(s)
});
std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
return f;
}
};
// Used to uniquely identify commands passed into `apply` in order to return
// the outputs of these commands. See `impure_state_machine` and `call`.
using cmd_id_t = utils::UUID;
@@ -271,6 +135,7 @@ public:
// We are on the leader server where the client submitted the command
// and waits for the output. Send it to them.
it->second.set_value(std::move(output));
_output_channels.erase(it);
} else {
// This is not the leader on which the command was submitted,
// or it is but the client already gave up on us and deallocated the channel.
@@ -284,7 +149,7 @@ public:
future<raft::snapshot_id> take_snapshot() override {
auto id = raft::snapshot_id::create_random_id();
_snapshots[id] = _val;
assert(_snapshots.emplace(id, _val).second);
co_return id;
}
@@ -303,6 +168,10 @@ public:
return _gate.close();
}
struct output_channel_dropped : public raft::error {
output_channel_dropped() : error("output channel dropped") {}
};
// Before sending a command to Raft, the client must obtain a command ID
// and an output channel using this function.
template <typename F>
@@ -313,7 +182,13 @@ public:
auto cmd_id = utils::make_random_uuid();
assert(_output_channels.emplace(cmd_id, std::move(p)).second);
auto guard = defer([this, cmd_id] { _output_channels.erase(cmd_id); });
auto guard = defer([this, cmd_id] {
auto it = _output_channels.find(cmd_id);
if (it != _output_channels.end()) {
it->second.set_exception(output_channel_dropped{});
_output_channels.erase(it);
}
});
return f(cmd_id, std::move(fut)).finally([guard = std::move(guard)] {});
});
}
@@ -330,7 +205,7 @@ raft::command make_command(const cmd_id_t& cmd_id, const Input& input) {
// TODO: handle other errors?
template <PureStateMachine M>
using call_result_t = std::variant<typename M::output_t, timed_out_error, raft::not_a_leader>;
using call_result_t = std::variant<typename M::output_t, timed_out_error, raft::not_a_leader, raft::dropped_entry>;
// Sends a given `input` as a command to `server`, waits until the command gets replicated
// and applied on that server and returns the produced output.
@@ -349,13 +224,20 @@ future<call_result_t<M>> call(
logical_timer& timer,
raft::server& server,
impure_state_machine<M>& sm) {
using output_channel_dropped = typename impure_state_machine<M>::output_channel_dropped;
return sm.with_output_channel([&, input = std::move(input), timeout] (cmd_id_t cmd_id, future<typename M::output_t> f) {
return timer.with_timeout(timeout, [&] (typename M::input_t input, future<typename M::output_t> f) {
return server.add_entry(
make_command(std::move(cmd_id), std::move(input)),
raft::wait_type::applied
).then([f = std::move(f)] () mutable {
return std::move(f);
).then_wrapped([output_f = std::move(f)] (future<> add_entry_f) mutable {
if (add_entry_f.failed()) {
// We need to discard `output_f`; the only expected exception is:
(void)output_f.discard_result().handle_exception_type([] (const output_channel_dropped&) {});
std::rethrow_exception(add_entry_f.get_exception());
}
return std::move(output_f);
});
}(std::move(input), std::move(f)));
}).then([] (typename M::output_t output) {
@@ -365,8 +247,13 @@ future<call_result_t<M>> call(
std::rethrow_exception(eptr);
} catch (raft::not_a_leader e) {
return make_ready_future<call_result_t<M>>(e);
} catch (timed_out_error e) {
} catch (raft::dropped_entry e) {
return make_ready_future<call_result_t<M>>(e);
} catch (logical_timer::timed_out<typename M::output_t> e) {
(void)e.get_future().discard_result()
.handle_exception_type([] (const output_channel_dropped&) {})
.handle_exception_type([] (const raft::dropped_entry&) {});
return make_ready_future<call_result_t<M>>(timed_out_error{});
}
});
}
@@ -546,7 +433,7 @@ public:
}
};
template <typename State, State init_state>
template <typename State>
class persistence : public raft::persistence {
snapshots_t<State>& _snapshots;
@@ -577,13 +464,13 @@ public:
// containing opnly this server's ID which must be also provided here as `init_config_id`.
// Otherwise it must be initialized with an empty configuration (it will be added to the cluster
// through a configuration change) and `init_config_id` must be `nullopt`.
persistence(snapshots_t<State>& snaps, std::optional<raft::server_id> init_config_id)
persistence(snapshots_t<State>& snaps, std::optional<raft::server_id> init_config_id, State init_state)
: _snapshots(snaps)
, _stored_snapshot(
raft::snapshot{
.config = init_config_id ? raft::configuration{*init_config_id} : raft::configuration{}
},
init_state)
std::move(init_state))
, _stored_term_and_vote(raft::term_t{1}, raft::server_id{})
{}
@@ -605,6 +492,12 @@ public:
assert(it != _snapshots.end());
_stored_snapshot = {snap, it->second};
if (!_stored_entries.empty() && snap.idx > _stored_entries.back()->idx) {
// Clear the log in order to not create a gap.
_stored_entries.clear();
co_return;
}
auto first_to_remain = snap.idx + 1 >= preserve_log_entries ? raft::index_t{snap.idx + 1 - preserve_log_entries} : raft::index_t{0};
_stored_entries.erase(_stored_entries.begin(), find(first_to_remain));
@@ -613,7 +506,7 @@ public:
virtual future<raft::snapshot> load_snapshot() override {
auto [snap, state] = _stored_snapshot;
_snapshots[snap.id] = std::move(state);
_snapshots.insert_or_assign(snap.id, std::move(state));
co_return snap;
}
@@ -679,11 +572,14 @@ private:
// The last time we sent a heartbeat.
raft::logical_clock::time_point _last_beat;
// How long from the last received heartbeat does it take to convict a node as dead.
const raft::logical_clock::duration _convict_threshold;
send_heartbeat_t _send_heartbeat;
public:
failure_detector(send_heartbeat_t f)
: _send_heartbeat(std::move(f))
failure_detector(raft::logical_clock::duration convict_threshold, send_heartbeat_t f)
: _convict_threshold(convict_threshold), _send_heartbeat(std::move(f))
{
send_heartbeats();
assert(_last_beat == _clock.now());
@@ -723,9 +619,6 @@ public:
}
bool is_alive(raft::server_id id) override {
// TODO: make it adjustable
static const raft::logical_clock::duration _convict_threshold = 50_t;
return _clock.now() < _last_heard[id] + _convict_threshold;
}
};
@@ -775,17 +668,19 @@ private:
raft::logical_clock _clock;
// How long does it take to deliver a message?
// TODO: use a random distribution or let the user change this dynamically
raft::logical_clock::duration _delivery_delay;
public:
network(deliver_t f)
: _deliver(std::move(f)) {}
network(raft::logical_clock::duration delivery_delay, deliver_t f)
: _deliver(std::move(f)), _delivery_delay(delivery_delay) {}
void send(raft::server_id src, raft::server_id dst, Payload payload) {
// Predict the delivery time in advance.
// Our prediction may be wrong if a grudge exists at this expected moment of delivery.
// Messages may also be reordered.
// TODO: scale with number of msgs already in transit and payload size?
// TODO: randomize the delivery time
auto delivery_time = _clock.now() + 5_t;
auto delivery_time = _clock.now() + _delivery_delay;
_events.push_back(event{delivery_time, message{src, dst, make_lw_shared<Payload>(std::move(payload))}});
std::push_heap(_events.begin(), _events.end(), cmp);
@@ -796,6 +691,14 @@ public:
deliver();
}
void add_grudge(raft::server_id src, raft::server_id dst) {
_grudges[dst].insert(src);
}
void remove_grudge(raft::server_id src, raft::server_id dst) {
_grudges[dst].erase(src);
}
private:
void deliver() {
// Deliver every message whose time has come.
@@ -897,6 +800,39 @@ private:
}
};
using reconfigure_result_t = std::variant<std::monostate,
timed_out_error, raft::not_a_leader, raft::dropped_entry, raft::commit_status_unknown, raft::conf_change_in_progress>;
future<reconfigure_result_t> reconfigure(
const std::vector<raft::server_id>& ids,
raft::logical_clock::time_point timeout,
logical_timer& timer,
raft::server& server) {
raft::server_address_set config;
for (auto id : ids) {
config.insert(raft::server_address { .id = id });
}
try {
co_await timer.with_timeout(timeout, [&server, config = std::move(config)] () {
return server.set_configuration(std::move(config));
}());
co_return std::monostate{};
} catch (raft::not_a_leader e) {
co_return e;
} catch (raft::dropped_entry e) {
co_return e;
} catch (raft::commit_status_unknown e) {
co_return e;
} catch (raft::conf_change_in_progress e) {
co_return e;
} catch (logical_timer::timed_out<void> e) {
(void)e.get_future().discard_result()
.handle_exception_type([] (const raft::dropped_entry&) {});
co_return timed_out_error{};
}
}
// Contains a `raft::server` and other facilities needed for it and the underlying
// modules (persistence, rpc, etc.) to run, and to communicate with the external environment.
template <PureStateMachine M>
@@ -939,7 +875,7 @@ public:
auto snapshots = std::make_unique<snapshots_t<state_t>>();
auto sm = std::make_unique<impure_state_machine<M>>(*snapshots);
auto rpc_ = std::make_unique<rpc<state_t>>(*snapshots, std::move(send_rpc));
auto persistence_ = std::make_unique<persistence<state_t, M::init>>(*snapshots, first_server ? std::optional{id} : std::nullopt);
auto persistence_ = std::make_unique<persistence<state_t>>(*snapshots, first_server ? std::optional{id} : std::nullopt, M::init);
auto queue = std::make_unique<delivery_queue<state_t>>(*rpc_);
auto& sm_ref = *sm;
@@ -1009,10 +945,13 @@ public:
});
}
future<> set_configuration(raft::server_address_set c) {
future<reconfigure_result_t> reconfigure(
const std::vector<raft::server_id>& ids,
raft::logical_clock::time_point timeout,
logical_timer& timer) {
assert(_started);
return with_gate(_gate, [this, c = std::move(c)] {
return _server->set_configuration(std::move(c));
return with_gate(_gate, [this, &ids, timeout, &timer] {
return ::reconfigure(ids, timeout, timer, *_server);
});
}
@@ -1059,6 +998,11 @@ static raft::server_id to_raft_id(size_t id) {
return raft::server_id{utils::UUID{0, id}};
}
struct environment_config {
raft::logical_clock::duration network_delay;
raft::logical_clock::duration fd_convict_threshold;
};
// A set of `raft_server`s connected by a `network`.
//
// The `network` is initialized with a message delivery function
@@ -1079,6 +1023,9 @@ class environment : public seastar::weakly_referencable<environment<M>> {
shared_ptr<failure_detector> _fd;
};
// Passed to newly created failure detectors.
const raft::logical_clock::duration _fd_convict_threshold;
// Used to deliver messages coming from the network to appropriate servers and their failure detectors.
// Also keeps the servers and the failure detectors alive (owns them).
// Before we show a Raft server to others we must add it to this map.
@@ -1098,8 +1045,8 @@ class environment : public seastar::weakly_referencable<environment<M>> {
seastar::gate _gate;
public:
environment()
: _network(
environment(environment_config cfg)
: _fd_convict_threshold(cfg.fd_convict_threshold), _network(cfg.network_delay,
[this] (raft::server_id src, raft::server_id dst, const message_t& m) {
auto& [s, fd] = _routes.at(dst);
fd->receive_heartbeat(src);
@@ -1145,7 +1092,7 @@ public:
// the first creating the failure detector for a node and wiring it up, the second creating a server on a given node.
// We will also possibly need to introduce some kind of ``node IDs'' which `failure_detector` (and `network`)
// will operate on (currently they operate on `raft::server_id`s, assuming a 1-1 mapping of server-to-node).
auto fd = seastar::make_shared<failure_detector>([id, this] (raft::server_id dst) {
auto fd = seastar::make_shared<failure_detector>(_fd_convict_threshold, [id, this] (raft::server_id dst) {
_network.send(id, dst, std::nullopt);
});
@@ -1176,6 +1123,10 @@ public:
return *_routes.at(id)._server;
}
network<message_t>& get_network() {
return _network;
}
// Must be called before we are destroyed unless `new_server` was never called.
future<> abort() {
// Close the gate before iterating over _routes to prevent concurrent modification by other methods.
@@ -1187,88 +1138,18 @@ public:
}
};
// Given a set of functions and associated positive natural numbers,
// calls the functions with periods defined by their corresponding numbers,
// yielding in between.
//
// Define a "tick" to be a (possibly empty) set of calls to some of the given functions
// followed by a yield. The ticker executes a sequence of ticks. Given {n, f}, where n
// is a number and f is a function, f will be called each nth tick.
//
// For example, suppose the ticker was started with the set {{2, f}, {4, g}, {4, h}}.
// Then the functions called in each tick are:
// tick 1: f, g, h tick 2: none, tick 3: f, tick 4: none, tick 5: f, g, h tick 2: none, and so on.
//
// The order of calls within a single tick is unspecified.
//
// The number of ticks can be limited. We crash if the ticker reaches the limit before it's `abort()`ed.
//
// Call `start` to provide the distribution and start the ticking.
class ticker {
bool _stop = false;
std::optional<future<>> _ticker;
std::optional<promise<>> _promise;
template <PureStateMachine M, std::invocable<environment<M>&, ticker&> F>
auto with_env_and_ticker(environment_config cfg, F f) {
return do_with(std::move(f), std::make_unique<environment<M>>(std::move(cfg)), std::make_unique<ticker>(tlogger),
[] (F& f, std::unique_ptr<environment<M>>& env, std::unique_ptr<ticker>& t) {
return f(*env, *t).finally([&env_ = env, &t_ = t] () mutable -> future<> {
// move into coroutine body so they don't get destroyed with the lambda (on first co_await)
auto& env = env_;
auto& t = t_;
public:
ticker() = default;
ticker(const ticker&) = delete;
ticker(ticker&&) = delete;
~ticker() {
assert(!_ticker);
}
using on_tick_t = noncopyable_function<void()>;
template <size_t N>
void start(std::pair<size_t, on_tick_t> (&&tick_funs)[N], uint64_t limit = std::numeric_limits<uint64_t>::max()) {
assert(!_ticker);
_ticker = tick(std::move(tick_funs), limit);
}
future<> abort() {
if (_ticker) {
_stop = true;
co_await *std::exchange(_ticker, std::nullopt);
}
}
private:
template <size_t N>
future<> tick(std::pair<size_t, on_tick_t> (&&tick_funs)[N], uint64_t limit) {
static_assert(N > 0);
auto funs = std::to_array(std::move(tick_funs));
for (uint64_t tick = 0; tick < limit; ++tick) {
if (_stop) {
tlogger.debug("ticker: finishing after {} ticks", tick);
co_return;
}
for (auto& [n, f] : funs) {
if (tick % n == 0) {
f();
}
}
co_await seastar::later();
}
tlogger.error("ticker: limit reached");
assert(false);
}
};
template <PureStateMachine M>
future<> with_env_and_ticker(noncopyable_function<future<>(environment<M>&, ticker&)> f) {
auto env = std::make_unique<environment<M>>();
auto t = std::make_unique<ticker>();
return f(*env, *t).finally([env_ = std::move(env), t_ = std::move(t)] () mutable -> future<> {
// move into coroutine body so they don't get destroyed with the lambda (on first co_await)
auto env = std::move(env_);
auto t = std::move(t_);
co_await t->abort();
co_await env->abort();
co_await t->abort();
co_await env->abort();
});
});
}
@@ -1299,9 +1180,11 @@ struct ExReg {
), input);
}
static const state_t init = 0;
static const state_t init;
};
const ExReg::state_t ExReg::init = 0;
namespace ser {
template <>
struct serializer<ExReg::exchange> {
@@ -1330,9 +1213,45 @@ namespace ser {
bool operator==(ExReg::ret a, ExReg::ret b) { return a.x == b.x; }
// Wait until either one of `nodes` in `env` becomes a leader, or duration `d` expires according to `timer` (whichever happens first).
// If the leader is found, returns it. Otherwise throws a `logical_timer::timed_out` exception.
template <PureStateMachine M>
struct wait_for_leader {
// FIXME: change into free function after clang bug #50345 is fixed
future<raft::server_id> operator()(
environment<M>& env,
std::vector<raft::server_id> nodes,
logical_timer& timer,
raft::logical_clock::duration d) {
auto l = co_await timer.with_timeout(timer.now() + d, [] (weak_ptr<environment<M>> env, std::vector<raft::server_id> nodes) -> future<raft::server_id> {
while (true) {
if (!env) {
co_return raft::server_id{};
}
auto it = std::find_if(nodes.begin(), nodes.end(), [&env] (raft::server_id id) { return env->get_server(id).is_leader(); });
if (it != nodes.end()) {
co_return *it;
}
co_await seastar::later();
}
}(env.weak_from_this(), std::move(nodes)));
assert(l != raft::server_id{});
assert(env.get_server(l).is_leader());
co_return l;
}
};
SEASTAR_TEST_CASE(basic_test) {
logical_timer timer;
co_await with_env_and_ticker<ExReg>([&timer] (environment<ExReg>& env, ticker& t) -> future<> {
environment_config cfg {
.network_delay = 5_t,
.fd_convict_threshold = 50_t,
};
co_await with_env_and_ticker<ExReg>(cfg, [&timer] (environment<ExReg>& env, ticker& t) -> future<> {
using output_t = typename ExReg::output_t;
t.start({
@@ -1348,16 +1267,7 @@ SEASTAR_TEST_CASE(basic_test) {
auto leader_id = co_await env.new_server(true);
// Wait at most 1000 ticks for the server to elect itself as a leader.
co_await timer.with_timeout(timer.now() + 1000_t, ([] (weak_ptr<environment<ExReg>> env, raft::server_id leader_id) -> future<> {
while (true) {
if (!env || env->get_server(leader_id).is_leader()) {
co_return;
}
co_await seastar::later();
}
})(env.weak_from_this(), leader_id));
assert(env.get_server(leader_id).is_leader());
assert(co_await wait_for_leader<ExReg>{}(env, {leader_id}, timer, 1000_t) == leader_id);
auto call = [&] (ExReg::input_t input, raft::logical_clock::duration timeout) {
return env.get_server(leader_id).call(std::move(input), timer.now() + timeout, timer);
@@ -1378,7 +1288,8 @@ SEASTAR_TEST_CASE(basic_test) {
tlogger.debug("Started 2 more servers, changing configuration");
co_await env.get_server(leader_id).set_configuration({{.id = leader_id}, {.id = id2}, {.id = id3}});
assert(std::holds_alternative<std::monostate>(
co_await env.get_server(leader_id).reconfigure({leader_id, id2, id3}, timer.now() + 100_t, timer)));
tlogger.debug("Configuration changed");

100
test/raft/ticker.hh Normal file
View File

@@ -0,0 +1,100 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/reactor.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
using namespace seastar;
// Given a set of functions and associated positive natural numbers,
// calls the functions with periods defined by their corresponding numbers,
// yielding in between.
//
// Define a "tick" to be a (possibly empty) set of calls to some of the given functions
// followed by a yield. The ticker executes a sequence of ticks. Given {n, f}, where n
// is a number and f is a function, f will be called each nth tick.
//
// For example, suppose the ticker was started with the set {{2, f}, {4, g}, {4, h}}.
// Then the functions called in each tick are:
// tick 1: f, g, h tick 2: none, tick 3: f, tick 4: none, tick 5: f, g, h tick 2: none, and so on.
//
// The order of calls within a single tick is unspecified.
//
// The number of ticks can be limited. We crash if the ticker reaches the limit before it's `abort()`ed.
//
// Call `start` to provide the distribution and start the ticking.
class ticker {
bool _stop = false;
std::optional<future<>> _ticker;
seastar::logger& _logger;
public:
ticker(seastar::logger& l) : _logger(l) {}
ticker(const ticker&) = delete;
ticker(ticker&&) = delete;
~ticker() {
assert(!_ticker);
}
using on_tick_t = noncopyable_function<void()>;
template <size_t N>
void start(std::pair<size_t, on_tick_t> (&&tick_funs)[N], uint64_t limit = std::numeric_limits<uint64_t>::max()) {
assert(!_ticker);
_ticker = tick(std::move(tick_funs), limit);
}
future<> abort() {
if (_ticker) {
_stop = true;
co_await *std::exchange(_ticker, std::nullopt);
}
}
private:
template <size_t N>
future<> tick(std::pair<size_t, on_tick_t> (&&tick_funs)[N], uint64_t limit) {
static_assert(N > 0);
auto funs = std::to_array(std::move(tick_funs));
for (uint64_t tick = 0; tick < limit; ++tick) {
if (_stop) {
_logger.debug("ticker: finishing after {} ticks", tick);
co_return;
}
for (auto& [n, f] : funs) {
if (tick % n == 0) {
f();
}
}
co_await seastar::later();
}
_logger.error("ticker: limit reached");
assert(false);
}
};