Files
scylladb/test/raft/replication.hh
Alex Dathskovsky 5e89a78c8f raft: refactor can_vote logic and type
This PR refactors the can_vote function in the Raft algorithms for improved clarity and maintainability by providing safer strong boolean types to the raft algorithm.

Fixes: #21937

Backport: No backport required

Closes scylladb/scylladb#25787
2025-09-24 13:55:05 +02:00

1532 lines
54 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <memory>
#include <random>
#include <bit>
#include <fmt/std.h>
#include <seastar/core/app-template.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/loop.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/log.hh>
#include <seastar/util/later.hh>
#include <seastar/util/variant_utils.hh>
#include <seastar/testing/random.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/testing/test_case.hh>
#include "raft/raft.hh"
#include "raft/server.hh"
#include "serializer.hh"
#include "serializer_impl.hh"
#include "utils/assert.hh"
#include "utils/xx_hasher.hh"
#include "utils/to_string.hh"
#include "test/raft/helpers.hh"
#include "test/lib/eventually.hh"
#include "test/lib/random_utils.hh"
// Test Raft library with declarative test definitions
//
// Each test can be defined by (struct test_case):
// .nodes number of nodes
// .total_values how many entries to append to leader nodes (default 100)
// .initial_term initial term # for setup
// .initial_leader what server is leader
// .initial_states initial logs of servers
// .le log entries
// .initial_snapshots snapshots present at initial state for servers
// .updates updates to execute on these servers
// entries{x} add the following x entries to the current leader
// new_leader{x} elect x as new leader
// partition{a,b,c} Only servers a,b,c are connected
// partition{a,leader{b},c} Only servers a,b,c are connected, and make b leader
// set_config{a,b,c} Change configuration on leader
// set_config{a,b,c} Change configuration on leader
// check_rpc_config{a,cfg} Check rpc config of a matches
// check_rpc_config{[],cfg} Check rpc config multiple nodes matches
//
// run_test
// - Creates the servers and initializes logs and snapshots
// with hasher/digest and tickers to advance servers
// - Processes updates one by one
// - Appends remaining values
// - Waits until all servers have logs of size of total_values entries
// - Verifies hash
// - Verifies persisted snapshots
//
// Tests are run also with 20% random packet drops.
// Two test cases are created for each with the macro
// RAFT_TEST_CASE(<test name>, <test case>)
using namespace std::chrono_literals;
using namespace std::placeholders;
extern seastar::logger tlogger;
const auto dummy_command = std::numeric_limits<int>::min();
class hasher_int {
std::variant<uint64_t, xx_hasher> _hasher;
inline static thread_local bool _commutative{false};
public:
static void set_commutative(bool commutative) {
_commutative = commutative;
}
hasher_int() {
if (_commutative) {
_hasher.emplace<uint64_t>(0);
} else {
_hasher.emplace<xx_hasher>();
}
}
void update(int val) noexcept {
if (auto* h = get_if<xx_hasher>(&_hasher); h != nullptr) {
h->update(reinterpret_cast<const char *>(&val), sizeof(val));
} else {
get<uint64_t>(_hasher) += val;
}
}
uint64_t finalize_uint64() {
if (auto* h = get_if<xx_hasher>(&_hasher); h != nullptr) {
return h->finalize_uint64();
} else {
return get<uint64_t>(_hasher);
}
}
static hasher_int hash_range(int max) {
hasher_int h;
for (int i = 0; i < max; ++i) {
h.update(i);
}
return h;
}
};
struct snapshot_value {
hasher_int hasher;
raft::index_t idx;
};
struct initial_state {
raft::config_member address{config_member_from_id({})};
raft::term_t term = raft::term_t(1);
raft::server_id vote;
std::vector<raft::log_entry> log;
raft::snapshot_descriptor snapshot;
snapshot_value snp_value;
raft::server::configuration server_config = raft::server::configuration{.append_request_threshold = 200};
};
// For verbosity in test declaration (i.e. node_id{x})
struct node_id {
size_t id;
};
std::vector<raft::server_id> to_raft_id_vec(std::vector<node_id> nodes) noexcept;
raft::server_address_set address_set(std::vector<node_id> nodes) noexcept;
raft::config_member_set config_set(std::vector<node_id> nodes) noexcept;
// Updates can be
// - Entries
// - Leader change
// - Configuration change
struct entries {
size_t n;
// If provided, use this server to add entries.
std::optional<size_t> server;
// Don't wait for previous requests to finish before issuing a new one.
bool concurrent;
entries(size_t n_arg, std::optional<size_t> server_arg = {}, bool concurrent_arg = false)
:n(n_arg), server(server_arg), concurrent(concurrent_arg) {}
};
struct new_leader {
size_t id;
};
struct leader {
size_t id;
};
// Inclusive range
struct range {
size_t start;
size_t end;
};
using partition = std::vector<std::variant<leader,range,int>>;
// Disconnect a node from the rest
struct isolate {
size_t id;
};
// Disconnect 2 servers both ways
struct two_nodes {
size_t first;
size_t second;
};
struct disconnect : public two_nodes {};
struct stop {
size_t id;
};
struct reset {
size_t id;
initial_state state;
};
struct wait_log {
std::vector<size_t> int_ids;
wait_log(size_t int_id) : int_ids({int_id}) {}
wait_log(std::initializer_list<size_t> int_ids) : int_ids(int_ids) {}
};
struct set_config_entry {
size_t node_idx;
raft::is_voter can_vote;
set_config_entry(size_t idx, raft::is_voter can_vote = raft::is_voter::yes)
: node_idx(idx), can_vote(can_vote)
{}
};
using set_config = std::vector<set_config_entry>;
struct config {
std::vector<node_id> curr;
std::vector<node_id> prev;
operator raft::configuration() {
auto current = config_set(curr);
auto previous = config_set(prev);
return raft::configuration{current, previous};
}
};
using rpc_address_set = std::vector<node_id>;
struct check_rpc_config {
std::vector<node_id> nodes;
rpc_address_set addrs;
check_rpc_config(node_id node, rpc_address_set addrs) : nodes({node}), addrs(addrs) {}
check_rpc_config(std::vector<node_id> nodes, rpc_address_set addrs) : nodes(nodes), addrs(addrs) {}
};
struct check_rpc_added {
std::vector<node_id> nodes;
size_t expected;
check_rpc_added(node_id node, size_t expected) : nodes({node}), expected(expected) {}
check_rpc_added(std::vector<node_id> nodes, size_t expected) : nodes(nodes), expected(expected) {}
};
struct check_rpc_removed {
std::vector<node_id> nodes;
size_t expected;
check_rpc_removed(node_id node, size_t expected) : nodes({node}), expected(expected) {}
check_rpc_removed(std::vector<node_id> nodes, size_t expected) : nodes(nodes), expected(expected) {}
};
using rpc_reset_counters = std::vector<node_id>;
struct tick {
uint64_t ticks;
};
struct read_value {
size_t node_idx; // which node should read
size_t expected_index; // expected read index
};
using update = std::variant<entries, new_leader, partition, isolate, disconnect,
stop, reset, wait_log, set_config, check_rpc_config, check_rpc_added,
check_rpc_removed, rpc_reset_counters, tick, read_value>;
struct log_entry {
unsigned term;
std::variant<int, raft::configuration> data;
};
struct initial_log {
std::vector<log_entry> le;
};
struct initial_snapshot {
raft::snapshot_descriptor snap;
};
struct test_case {
const size_t nodes;
const size_t total_values = 100;
uint64_t initial_term = 1;
const size_t initial_leader = 0;
const std::vector<struct initial_log> initial_states;
const std::vector<struct initial_snapshot> initial_snapshots;
const std::vector<raft::server::configuration> config;
const std::vector<update> updates;
const bool commutative_hash = false;
const bool verify_persisted_snapshots = true;
size_t get_first_val();
};
std::mt19937 random_generator() noexcept;
int rand() noexcept;
// Lets assume one snapshot per server
using snapshots = std::unordered_map<raft::server_id, std::unordered_map<raft::snapshot_id, snapshot_value>>;
using persisted_snapshots = std::unordered_map<raft::server_id, std::pair<raft::snapshot_descriptor, snapshot_value>>;
extern seastar::semaphore snapshot_sync;
// application of a snapshot with that id will be delayed until snapshot_sync is signaled
extern raft::snapshot_id delay_apply_snapshot;
// sending of a snapshot with that id will be delayed until snapshot_sync is signaled
extern raft::snapshot_id delay_send_snapshot;
// Test connectivity configuration
struct rpc_config {
bool drops = false;
// Network delay. Note implementation expects it to be smaller than tick delay
std::chrono::milliseconds network_delay = 0ms; // 0ms means no delays
// Latency within same server
std::chrono::milliseconds local_delay = 0ms;
// How many nodes per server, rounded to closest power of 2 (fast prefix check)
size_t local_nodes = 32;
// Delay 0...extra_delay_max us to mimic busy server
size_t extra_delay_max = 500;
};
template <typename Clock>
class raft_cluster {
using apply_fn = std::function<size_t(raft::server_id id, const std::vector<raft::command_cref>& commands, lw_shared_ptr<hasher_int> hasher)>;
class state_machine;
class persistence;
class connected;
class failure_detector;
class rpc;
using rpc_net = std::unordered_map<raft::server_id, rpc*>;
struct test_server {
std::unique_ptr<raft::server> server;
state_machine* sm;
raft_cluster::rpc* rpc;
};
std::vector<test_server> _servers;
std::unique_ptr<connected> _connected;
std::unique_ptr<snapshots> _snapshots;
std::unique_ptr<persisted_snapshots> _persisted_snapshots;
size_t _apply_entries;
size_t _next_val;
rpc_config _rpc_config;
bool _prevote;
apply_fn _apply;
std::unordered_set<size_t> _in_configuration; // Servers in current configuration
std::vector<seastar::timer<Clock>> _tickers;
size_t _leader;
std::vector<initial_state> get_states(test_case test, bool prevote);
typename Clock::duration _tick_delta;
bool _verify_persisted_snapshots;
rpc_net _rpc_net;
// Tick phase delay for each node, uniformly spread across tick delta
std::vector<typename Clock::duration> _tick_delays;
public:
raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
bool prevote, typename Clock::duration tick_delta,
rpc_config rpc_config);
// No copy
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
raft::server& get_server(size_t id);
future<> stop_server(size_t id, sstring reason = "");
future<> reset_server(size_t id, initial_state state); // Reset a stopped server
size_t size() {
return _servers.size();
}
future<> start_all();
future<> stop_all();
future<> wait_all();
void disconnect(size_t id, std::optional<raft::server_id> except = std::nullopt);
void connect_all();
void elapse_elections();
future<> elect_new_leader(size_t new_leader);
future<> free_election();
future<> init_raft_tickers();
void pause_tickers();
future<> restart_tickers();
void cancel_ticker(size_t id);
void set_ticker_callback(size_t id) noexcept;
void init_tick_delays(size_t n);
future<> add_entry(size_t val, std::optional<size_t> server);
future<> add_entries(size_t n, std::optional<size_t> server = std::nullopt);
future<> add_entries_concurrent(size_t n, std::optional<size_t> server = std::nullopt);
future<> add_remaining_entries();
future<> wait_log(size_t follower);
future<> wait_log(::wait_log followers);
future<> wait_log_all();
future<> change_configuration(::set_config sc);
future<> check_rpc_config(::check_rpc_config cc);
void check_rpc_added(::check_rpc_added expected) const;
void check_rpc_removed(::check_rpc_removed expected) const;
void rpc_reset_counters(::rpc_reset_counters nodes);
future<> reconfigure_all();
future<> partition(::partition p);
future<> tick(::tick t);
future<> read(read_value r);
future<> stop(::stop server);
future<> reset(::reset server);
void disconnect(::disconnect nodes);
future<> isolate(::isolate node);
void verify();
private:
test_server create_server(size_t id, initial_state state);
};
template <typename Clock>
class raft_cluster<Clock>::state_machine : public raft::state_machine {
raft::server_id _id;
apply_fn _apply;
size_t _apply_entries;
size_t _seen = 0;
promise<> _done;
snapshots* _snapshots;
public:
lw_shared_ptr<hasher_int> hasher;
state_machine(raft::server_id id, apply_fn apply, size_t apply_entries,
snapshots* snapshots):
_id(id), _apply(std::move(apply)), _apply_entries(apply_entries), _snapshots(snapshots),
hasher(make_lw_shared<hasher_int>()) {}
future<> apply(const std::vector<raft::command_cref> commands) override {
auto n = _apply(_id, commands, hasher);
_seen += n;
if (n && _seen >= _apply_entries) {
if (_seen > _apply_entries) {
// Retrying `commit_status_unknown` may lead to this. Ref: #14072
tlogger.warn("sm::apply[{}]: _seen ({}) overshot _apply_entries ({})", _id, _seen, _apply_entries);
}
_done.set_value();
}
tlogger.debug("sm::apply[{}] got {}/{} entries", _id, _seen, _apply_entries);
return make_ready_future<>();
}
future<raft::snapshot_id> take_snapshot() override {
auto snp_id = raft::snapshot_id::create_random_id();
(*_snapshots)[_id][snp_id].hasher = *hasher;
tlogger.debug("sm[{}] takes snapshot id {} {} seen {}", _id, (*_snapshots)[_id][snp_id].hasher.finalize_uint64(), snp_id, _seen);
(*_snapshots)[_id][snp_id].idx = raft::index_t{_seen};
return make_ready_future<raft::snapshot_id>(snp_id);
}
void drop_snapshot(raft::snapshot_id snp_id) override {
(*_snapshots)[_id].erase(snp_id);
}
future<> load_snapshot(raft::snapshot_id snp_id) override {
hasher = make_lw_shared<hasher_int>((*_snapshots)[_id][snp_id].hasher);
tlogger.debug("sm[{}] loads snapshot {} idx={}", _id, (*_snapshots)[_id][snp_id].hasher.finalize_uint64(), (*_snapshots)[_id][snp_id].idx);
_seen = (*_snapshots)[_id][snp_id].idx.value();
if (_seen >= _apply_entries) {
_done.set_value();
}
if (snp_id == delay_apply_snapshot) {
snapshot_sync.signal();
co_await snapshot_sync.wait();
}
co_return;
};
future<> abort() override { return make_ready_future<>(); }
future<> done() {
return _done.get_future();
}
};
template <typename Clock>
class raft_cluster<Clock>::persistence : public raft::persistence {
raft::server_id _id;
initial_state _conf;
snapshots* _snapshots;
persisted_snapshots* _persisted_snapshots;
public:
persistence(raft::server_id id, initial_state conf, snapshots* snapshots,
persisted_snapshots* persisted_snapshots) : _id(id),
_conf(std::move(conf)), _snapshots(snapshots),
_persisted_snapshots(persisted_snapshots) {}
persistence() {}
future<> store_term_and_vote(raft::term_t term, raft::server_id vote) override { return seastar::sleep(1us); }
future<std::pair<raft::term_t, raft::server_id>> load_term_and_vote() override {
auto term_and_vote = std::make_pair(_conf.term, _conf.vote);
return make_ready_future<std::pair<raft::term_t, raft::server_id>>(term_and_vote);
}
future<> store_commit_idx(raft::index_t) override {
co_return;
}
future<raft::index_t> load_commit_idx() override {
co_return raft::index_t{0};
}
future<> store_snapshot_descriptor(const raft::snapshot_descriptor& snap, size_t preserve_log_entries) override {
(*_persisted_snapshots)[_id] = std::make_pair(snap, (*_snapshots)[_id][snap.id]);
tlogger.debug("sm[{}] persists snapshot {}", _id, (*_snapshots)[_id][snap.id].hasher.finalize_uint64());
return make_ready_future<>();
}
future<raft::snapshot_descriptor> load_snapshot_descriptor() override {
return make_ready_future<raft::snapshot_descriptor>(_conf.snapshot);
}
future<> store_log_entries(const std::vector<raft::log_entry_ptr>& entries) override { return seastar::sleep(1us); };
future<raft::log_entries> load_log() override {
raft::log_entries log;
for (auto&& e : _conf.log) {
log.emplace_back(make_lw_shared(std::move(e)));
}
return make_ready_future<raft::log_entries>(std::move(log));
}
future<> truncate_log(raft::index_t idx) override { return make_ready_future<>(); }
future<> abort() override { return make_ready_future<>(); }
};
template <typename Clock>
struct raft_cluster<Clock>::connected {
struct connection {
raft::server_id from;
raft::server_id to;
bool operator==(const connection &o) const {
return from == o.from && to == o.to;
}
};
struct hash_connection {
std::size_t operator() (const connection &c) const {
return std::hash<utils::UUID>()(c.from.id);
}
};
// Map of from->to disconnections
std::unordered_set<connection, hash_connection> disconnected;
size_t n;
connected(size_t n) : n(n) { }
// Cut connectivity of two servers both ways
void cut(raft::server_id id1, raft::server_id id2) {
disconnected.insert({id1, id2});
disconnected.insert({id2, id1});
}
// Isolate a server
void disconnect(raft::server_id id, std::optional<raft::server_id> except = std::nullopt) {
for (size_t other = 0; other < n; ++other) {
auto other_id = to_raft_id(other);
// Disconnect if not the same, and the other id is not an exception
// disconnect(0, except=1)
if (id != other_id && !(except && other_id == *except)) {
cut(id, other_id);
}
}
}
// Re-connect a node to all other nodes
void connect(raft::server_id id) {
for (auto it = disconnected.begin(); it != disconnected.end(); ) {
if (id == it->from || id == it->to) {
it = disconnected.erase(it);
} else {
++it;
}
}
}
void connect_all() {
disconnected.clear();
}
bool operator()(raft::server_id id1, raft::server_id id2) {
// It's connected if both ways are not disconnected
return !disconnected.contains({id1, id2}) && !disconnected.contains({id1, id2});
}
};
template <typename Clock>
class raft_cluster<Clock>::failure_detector : public raft::failure_detector {
raft::server_id _id;
connected* _connected;
public:
failure_detector(raft::server_id id, connected* connected) : _id(id), _connected(connected) {}
bool is_alive(raft::server_id server) override {
return (*_connected)(server, _id);
}
};
template <typename Clock>
class raft_cluster<Clock>::rpc : public raft::rpc {
raft::server_id _id;
connected* _connected;
snapshots* _snapshots;
rpc_net& _net;
rpc_config _rpc_config;
raft::server_address_set _known_peers;
uint32_t _servers_added = 0;
uint32_t _servers_removed = 0;
// Used to ensure that when `abort()` returns there are
// no more in-progress methods running on this object.
seastar::gate _gate;
// prefix mask for shards in same node
uint64_t _same_node_prefix;
bool _delays;
public:
rpc(raft::server_id id, connected* connected, snapshots* snapshots,
rpc_net& net, rpc_config rpc_config)
: _id(id)
, _connected(connected)
, _snapshots(snapshots)
, _net(net)
, _rpc_config(rpc_config)
, _delays(rpc_config.network_delay > 0ms)
{
_net[_id] = this;
// Rounds to next power of 2
_same_node_prefix = (1 << std::bit_width(_rpc_config.local_nodes)) - 1;
}
bool drop_packet() {
return _rpc_config.drops && !(rand() % 5);
}
bool is_local_node(raft::server_id& id) {
return (to_int_id(_id.id) & _same_node_prefix) == (to_int_id(id.id) & _same_node_prefix);
}
typename Clock::duration get_delay(raft::server_id id) {
if (is_local_node(id)) {
return _rpc_config.local_delay;
} else {
return _rpc_config.network_delay;
}
}
auto rand_extra_delay() {
return tests::random::get_int<size_t>(0, _rpc_config.extra_delay_max) * 1us;
}
future<raft::snapshot_reply> send_snapshot(raft::server_id id,
const raft::install_snapshot& snap, seastar::abort_source& as) override {
if (!_net.count(id)) {
throw std::runtime_error("trying to send a message to an unknown node");
}
if (!(*_connected)(id, _id)) {
throw std::runtime_error("cannot send snapshot since nodes are disconnected");
}
auto s = snap; // snap is not always held alive by a caller
(*_snapshots)[id][s.snp.id] = (*_snapshots)[_id][s.snp.id];
if (s.snp.id == delay_send_snapshot) {
co_await snapshot_sync.wait();
snapshot_sync.signal();
}
co_return co_await _net[id]->_client->apply_snapshot(_id, std::move(s));
}
future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) override {
if (!_net.count(id)) {
return make_exception_future(std::runtime_error("trying to send a message to an unknown node"));
}
if (!(*_connected)(id, _id)) {
return make_exception_future<>(std::runtime_error("cannot send append since nodes are disconnected"));
}
if (!drop_packet()) {
if (_delays) {
return with_gate(_gate, [&, this] () mutable -> future<> {
return seastar::sleep(get_delay(id) + rand_extra_delay()).then(
[this, id = std::move(id), append_request = std::move(append_request)] {
if ((*_connected)(id, _id)) {
_net[id]->_client->append_entries(_id, append_request);
}
});
});
} else {
_net[id]->_client->append_entries(_id, append_request);
}
}
return make_ready_future<>();
}
void send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) override {
if (!_net.count(id)) {
return;
}
if (!(*rpc::_connected)(id, rpc::_id)) {
return;
}
if (!drop_packet()) {
if (_delays) {
(void)with_gate(_gate, [&, this] () mutable -> future<> {
return seastar::sleep(get_delay(id) + rand_extra_delay()).then(
[this, id = std::move(id), reply = std::move(reply)] {
if ((*_connected)(id, _id)) {
_net[id]->_client->append_entries_reply(rpc::_id, std::move(reply));
}
});
});
} else {
_net[id]->_client->append_entries_reply(rpc::_id, std::move(reply));
}
}
}
void send_vote_request(raft::server_id id, const raft::vote_request& vote_request) override {
if (!_net.count(id)) {
return;
}
if (!(*rpc::_connected)(id, rpc::_id)) {
return;
}
if (_delays) {
(void)with_gate(_gate, [&, this] () mutable -> future<> {
return seastar::sleep(get_delay(id) + rand_extra_delay()).then(
[this, id = std::move(id), vote_request = std::move(vote_request)] {
if ((*_connected)(id, _id)) {
_net[id]->_client->request_vote(rpc::_id, std::move(vote_request));
}
});
});
} else {
_net[id]->_client->request_vote(rpc::_id, std::move(vote_request));
}
}
void send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) override {
if (!_net.count(id)) {
return;
}
if (!(*rpc::_connected)(id, rpc::_id)) {
return;
}
if (_delays) {
(void)with_gate(_gate, [&, this] () mutable -> future<> {
return seastar::sleep(get_delay(id) + rand_extra_delay()).then([=, this] {
if ((*_connected)(id, _id)) {
_net[id]->_client->request_vote_reply(rpc::_id, vote_reply);
}
});
});
} else {
_net[id]->_client->request_vote_reply(rpc::_id, vote_reply);
}
}
void send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) override {
if (!_net.count(id)) {
return;
}
if (!(*_connected)(id, _id)) {
return;
}
_net[id]->_client->timeout_now_request(_id, std::move(timeout_now));
}
future<> abort() override {
tlogger.debug("[{}] rpc aborting", _id);
return _gate.close();
}
void send_read_quorum(raft::server_id id, const raft::read_quorum& read_quorum) override {
if (!_net.count(id)) {
return;
}
if (!(*_connected)(id, _id)) {
return;
}
if (!drop_packet()) {
_net[id]->_client->read_quorum_request(_id, read_quorum);
}
}
void send_read_quorum_reply(raft::server_id id, const raft::read_quorum_reply& reply) override {
if (!_net.count(id)) {
return;
}
if (!(*_connected)(id, _id)) {
return;
}
if (!drop_packet()) {
_net[id]->_client->read_quorum_reply(_id, std::move(reply));
}
}
future<raft::read_barrier_reply> execute_read_barrier_on_leader(raft::server_id id) override {
if (!_net.count(id)) {
return make_exception_future<raft::read_barrier_reply>(std::runtime_error("trying to send a message to an unknown node"));
}
if (!(*_connected)(id, _id)) {
return make_exception_future<raft::read_barrier_reply>(std::runtime_error("cannot send append since nodes are disconnected"));
}
return _net[id]->_client->execute_read_barrier(_id, nullptr);
}
void check_known_and_connected(raft::server_id id) {
if (!_net.count(id)) {
throw std::runtime_error("trying to send a message to an unknown node");
}
if (!(*_connected)(id, _id)) {
throw std::runtime_error("cannot send since nodes are disconnected");
}
}
future<raft::add_entry_reply> send_add_entry(raft::server_id id, const raft::command& cmd) override {
check_known_and_connected(id);
return _net[id]->_client->execute_add_entry(_id, cmd, nullptr);
}
future<raft::add_entry_reply> send_modify_config(raft::server_id id,
const std::vector<raft::config_member>& add,
const std::vector<raft::server_id>& del) override {
check_known_and_connected(id);
return _net[id]->_client->execute_modify_config(_id, add, del, nullptr);
}
void on_configuration_change(raft::server_address_set add, raft::server_address_set del) override {
_known_peers.merge(add);
_servers_added += add.size();
for (const auto& addr: del) {
_known_peers.erase(addr);
}
_servers_removed += del.size();
}
const raft::server_address_set& known_peers() const {
return _known_peers;
}
void reset_counters() {
_servers_added = 0;
_servers_removed = 0;
}
uint32_t servers_added() const {
return _servers_added;
}
uint32_t servers_removed() const {
return _servers_removed;
}
};
template <typename Clock>
typename raft_cluster<Clock>::test_server raft_cluster<Clock>::create_server(size_t id, initial_state state) {
auto uuid = to_raft_id(id);
auto sm = std::make_unique<state_machine>(uuid, _apply, _apply_entries, _snapshots.get());
auto& rsm = *sm;
std::unique_ptr<raft_cluster::rpc> mrpc = std::make_unique<raft_cluster::rpc>(uuid, _connected.get(),
_snapshots.get(), _rpc_net, _rpc_config);
auto& rpc_ref = *mrpc;
auto mpersistence = std::make_unique<persistence>(uuid, state,
_snapshots.get(), _persisted_snapshots.get());
auto fd = seastar::make_shared<failure_detector>(uuid, _connected.get());
auto raft = raft::create_server(uuid, std::move(mrpc), std::move(sm), std::move(mpersistence),
std::move(fd), state.server_config);
return {
std::move(raft),
&rsm,
&rpc_ref
};
}
template <typename Clock>
raft_cluster<Clock>::raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
bool prevote, typename Clock::duration tick_delta,
rpc_config rpc_config)
: _connected(std::make_unique<struct connected>(test.nodes))
, _snapshots(std::make_unique<snapshots>())
, _persisted_snapshots(std::make_unique<persisted_snapshots>())
, _apply_entries(apply_entries)
, _next_val(first_val)
, _rpc_config(rpc_config)
, _prevote(prevote)
, _apply(apply)
, _leader(first_leader)
, _tick_delta(tick_delta)
, _verify_persisted_snapshots(test.verify_persisted_snapshots) {
auto states = get_states(test, prevote);
for (size_t s = 0; s < states.size(); ++s) {
_in_configuration.insert(s);
}
raft::configuration config;
for (size_t i = 0; i < states.size(); i++) {
states[i].address = config_member_from_id(to_raft_id(i));
config.current.emplace(states[i].address);
}
if (_rpc_config.network_delay > 0ms) {
init_tick_delays(test.nodes);
}
for (size_t i = 0; i < states.size(); i++) {
auto& s = states[i].address;
states[i].snapshot.config = config;
(*_snapshots)[s.addr.id][states[i].snapshot.id] = states[i].snp_value;
_servers.emplace_back(create_server(i, states[i]));
}
}
template <typename Clock>
void raft_cluster<Clock>::init_tick_delays(size_t n) {
_tick_delays.reserve(n);
for (size_t s = 0; s < n; s++) {
auto delay = tests::random::get_int<size_t>(0, _tick_delta.count());
_tick_delays.push_back(delay * _tick_delta / _tick_delta.count());
}
}
template <typename Clock>
raft::server& raft_cluster<Clock>::get_server(size_t id) {
return *_servers[id].server;
}
template <typename Clock>
future<> raft_cluster<Clock>::stop_server(size_t id, sstring reason) {
cancel_ticker(id);
co_await _servers[id].server->abort(std::move(reason));
if (_snapshots->contains(to_raft_id(id))) {
BOOST_CHECK_LE((*_snapshots)[to_raft_id(id)].size(), 2);
_snapshots->erase(to_raft_id(id));
}
_persisted_snapshots->erase(to_raft_id(id));
}
// Reset previously stopped server
template <typename Clock>
future<> raft_cluster<Clock>::reset_server(size_t id, initial_state state) {
_servers[id] = create_server(id, state);
co_await _servers[id].server->start();
set_ticker_callback(id);
}
template <typename Clock>
future<> raft_cluster<Clock>::start_all() {
co_await coroutine::parallel_for_each(_servers, [] (auto& r) {
return r.server->start();
});
co_await init_raft_tickers();
BOOST_TEST_MESSAGE("Electing first leader " << _leader);
_servers[_leader].server->wait_until_candidate();
co_await _servers[_leader].server->wait_election_done();
}
template <typename Clock>
future<> raft_cluster<Clock>::stop_all() {
for (auto s: _in_configuration) {
co_await stop_server(s);
};
}
template <typename Clock>
future<> raft_cluster<Clock>::wait_all() {
for (auto s: _in_configuration) {
co_await _servers[s].sm->done();
}
}
template <typename Clock>
void raft_cluster<Clock>::disconnect(size_t id, std::optional<raft::server_id> except) {
_connected->disconnect(to_raft_id(id), except);
}
template <typename Clock>
void raft_cluster<Clock>::connect_all() {
_connected->connect_all();
}
// Add consecutive integer entries to a leader
template <typename Clock>
future<> raft_cluster<Clock>::add_entries(size_t n, std::optional<size_t> server) {
size_t end = _next_val + n;
while (_next_val != end) {
co_await add_entry(_next_val, server);
_next_val++;
}
}
// Add consecutive integer entries to a leader concurrently
template <typename Clock>
future<> raft_cluster<Clock>::add_entries_concurrent(size_t n, std::optional<size_t> server) {
const auto start = _next_val;
_next_val += n;
return parallel_for_each(std::views::iota(start, _next_val), [this, server](size_t v) { return add_entry(v, server); });
}
template <typename Clock>
future<> raft_cluster<Clock>::add_entry(size_t val, std::optional<size_t> server) {
while (true) {
try {
auto& at = _servers[server ? *server : _leader].server;
co_await at->add_entry(create_command(val), raft::wait_type::committed, nullptr);
break;
} catch (raft::commit_status_unknown& e) {
// FIXME: in some cases when we get `commit_status_unknown` the entry may have been applied.
// Retrying it could lead to double application which causes hard to debug failures, e.g. #14029.
// For now we leave a warning so the logs give a hint if such a failure happens and we need
// to debug it. Ideally we would never have to handle `commit_status_unknown` but some replication
// tests rely on retrying it during leader changes etc.
tlogger.warn("replication_test: got `commit_status_unknown` from `add_entry`"
", val: {}, server: {}", val, server);
} catch (raft::dropped_entry& e) {
// retry if an entry is dropped because the leader have changed after it was submitted
}
}
}
template <typename Clock>
future<> raft_cluster<Clock>::add_remaining_entries() {
co_await add_entries(_apply_entries - _next_val);
}
template <typename Clock>
future<> raft_cluster<Clock>::init_raft_tickers() {
_tickers.resize(_servers.size());
// Only start tickers for servers in configuration
for (auto s: _in_configuration) {
_tickers[s].set_callback([&, s] {
_servers[s].server->tick();
});
}
co_await restart_tickers();
}
template <typename Clock>
void raft_cluster<Clock>::pause_tickers() {
for (auto s: _in_configuration) {
_tickers[s].cancel();
}
}
template <typename Clock>
future<> raft_cluster<Clock>::restart_tickers() {
if (_tick_delays.size()) {
co_await coroutine::parallel_for_each(_in_configuration, [&] (size_t s) -> future<> {
co_await seastar::sleep(_tick_delays[s]);
_tickers[s].rearm_periodic(_tick_delta);
});
} else {
for (auto s: _in_configuration) {
_tickers[s].rearm_periodic(_tick_delta);
}
}
}
template <typename Clock>
void raft_cluster<Clock>::cancel_ticker(size_t id) {
_tickers[id].cancel();
}
template <typename Clock>
void raft_cluster<Clock>::set_ticker_callback(size_t id) noexcept {
_tickers[id].set_callback([&, id] {
_servers[id].server->tick();
});
}
std::vector<raft::log_entry> create_log(std::vector<log_entry> list, raft::index_t start_idx);
size_t apply_changes(raft::server_id id, const std::vector<raft::command_cref>& commands,
lw_shared_ptr<hasher_int> hasher);
// Wait for leader log to propagate to follower
template <typename Clock>
future<> raft_cluster<Clock>::wait_log(size_t follower) {
if ((*_connected)(to_raft_id(_leader), to_raft_id(follower)) &&
_in_configuration.contains(_leader) && _in_configuration.contains(follower)) {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
co_await _servers[follower].server->wait_log_idx_term(leader_log_idx_term);
}
}
// Wait for leader log to propagate to specified followers
template <typename Clock>
future<> raft_cluster<Clock>::wait_log(::wait_log followers) {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
for (auto s: followers.int_ids) {
co_await _servers[s].server->wait_log_idx_term(leader_log_idx_term);
}
}
// Wait for all connected followers to catch up
template <typename Clock>
future<> raft_cluster<Clock>::wait_log_all() {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
for (size_t s = 0; s < _servers.size(); ++s) {
if (s != _leader && (*_connected)(to_raft_id(s), to_raft_id(_leader)) &&
_in_configuration.contains(s)) {
co_await _servers[s].server->wait_log_idx_term(leader_log_idx_term);
}
}
}
template <typename Clock>
void raft_cluster<Clock>::elapse_elections() {
for (auto s: _in_configuration) {
_servers[s].server->elapse_election();
}
}
template <typename Clock>
future<> raft_cluster<Clock>::elect_new_leader(size_t new_leader) {
BOOST_CHECK_MESSAGE(new_leader < _servers.size(),
format("Wrong next leader value {}", new_leader));
if (new_leader == _leader) {
co_return;
}
// Prevote prevents dueling candidate from bumping up term
// but in corner cases it needs a loop to retry.
// With prevote we need our candidate to retry bumping term
// and waiting log on every loop.
if (_prevote) {
bool both_connected = (*_connected)(to_raft_id(_leader), to_raft_id(new_leader));
if (both_connected) {
co_await wait_log(new_leader);
}
pause_tickers();
// Leader could be already partially disconnected, save current connectivity state
struct connected prev_disconnected = *_connected;
// Disconnect current leader from everyone
_connected->disconnect(to_raft_id(_leader));
// Make move all nodes past election threshold, also making old leader follower
elapse_elections();
do {
// Consume leader output messages since a stray append might make new leader step down
co_await yield(); // yield
_servers[new_leader].server->wait_until_candidate();
if (both_connected) {
// Allow old leader to vote for new candidate while not looking alive to others
// Re-connect old leader
_connected->connect(to_raft_id(_leader));
// Disconnect old leader from all nodes except new leader
_connected->disconnect(to_raft_id(_leader), to_raft_id(new_leader));
}
co_await _servers[new_leader].server->wait_election_done();
if (both_connected) {
// Re-disconnect leader for next loop
_connected->disconnect(to_raft_id(_leader));
}
} while (!_servers[new_leader].server->is_leader());
// Restore connections to the original setting
*_connected = prev_disconnected;
co_await restart_tickers();
co_await wait_log_all();
} else { // not prevote
do {
if ((*_connected)(to_raft_id(_leader), to_raft_id(new_leader))) {
co_await wait_log(new_leader);
}
pause_tickers();
// Leader could be already partially disconnected, save current connectivity state
struct connected prev_disconnected = *_connected;
// Disconnect current leader from everyone
_connected->disconnect(to_raft_id(_leader));
// Make move all nodes past election threshold, also making old leader follower
elapse_elections();
// Consume leader output messages since a stray append might make new leader step down
co_await yield(); // yield
_servers[new_leader].server->wait_until_candidate();
// Re-connect old leader
_connected->connect(to_raft_id(_leader));
// Disconnect old leader from all nodes except new leader
_connected->disconnect(to_raft_id(_leader), to_raft_id(new_leader));
co_await restart_tickers();
co_await _servers[new_leader].server->wait_election_done();
// Restore connections to the original setting
*_connected = prev_disconnected;
} while (!_servers[new_leader].server->is_leader());
}
tlogger.debug("confirmed leader on {}", to_raft_id(new_leader));
_leader = new_leader;
}
// Run a free election of nodes in configuration
// NOTE: there should be enough nodes capable of participating
template <typename Clock>
future<> raft_cluster<Clock>::free_election() {
tlogger.debug("Running free election");
size_t loops = 0;
for (;; loops++) {
co_await seastar::sleep(_tick_delta); // Wait for election rpc exchanges
// find if we have a leader
for (auto s: _in_configuration) {
if (_servers[s].server->is_leader()) {
tlogger.debug("New leader {} (in {} loops)", to_raft_id(s), loops);
_leader = s;
co_return;
}
}
}
}
template <typename Clock>
future<> raft_cluster<Clock>::change_configuration(set_config sc) {
BOOST_CHECK_MESSAGE(sc.size() > 0, "Empty configuration change not supported");
raft::config_member_set set;
std::unordered_set<size_t> new_config;
for (auto s: sc) {
new_config.insert(s.node_idx);
auto m = to_config_member(s.node_idx);
m.can_vote = s.can_vote;
set.insert(std::move(m));
BOOST_CHECK_MESSAGE(s.node_idx < _servers.size(),
format("Configuration element {} past node limit {}", s.node_idx, _servers.size() - 1));
}
BOOST_CHECK_MESSAGE(new_config.contains(_leader) || sc.size() < (_servers.size()/2 + 1),
"New configuration without old leader and below quorum size (no election)");
if (!new_config.contains(_leader)) {
// Wait log on all nodes in new config before change
for (auto s: sc) {
co_await wait_log(s.node_idx);
}
}
// Start nodes in new configuration but not in current configuration (re-added)
for (auto s: new_config) {
if (!_in_configuration.contains(s)) {
tlogger.debug("Starting node being re-added to configuration {}", s);
co_await reset_server(s, initial_state{.log = {}});
if (_tick_delays.size()) {
co_await seastar::sleep(_tick_delays[s]);
}
_tickers[s].rearm_periodic(_tick_delta);
}
}
tlogger.debug("Changing configuration on leader {}", _leader);
co_await _servers[_leader].server->set_configuration(std::move(set), nullptr);
if (!new_config.contains(_leader)) {
co_await free_election();
}
// Now we know joint configuration was applied
// Add a dummy entry to confirm new configuration was committed
try {
co_await _servers[_leader].server->add_entry(create_command(dummy_command),
raft::wait_type::committed, nullptr);
} catch (raft::not_a_leader& e) {
// leader stepped down, implying config fully changed
} catch (raft::commit_status_unknown& e) {}
// Stop nodes no longer in configuration
for (auto s: _in_configuration) {
if (!new_config.contains(s)) {
_tickers[s].cancel();
co_await stop_server(s);
}
}
_in_configuration = new_config;
}
template <typename Clock>
future<> raft_cluster<Clock>::check_rpc_config(::check_rpc_config cc) {
auto as = address_set(cc.addrs);
for (auto& node: cc.nodes) {
BOOST_CHECK(node.id < _servers.size());
co_await seastar::async([&] {
BOOST_CHECK(eventually_true([&] { return _servers[node.id].rpc->known_peers() == as; }));
});
}
}
template <typename Clock>
void raft_cluster<Clock>::check_rpc_added(::check_rpc_added expected) const {
for (auto node: expected.nodes) {
BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_added() == expected.expected,
format("RPC added {} does not match expected {}",
_servers[node.id].rpc->servers_added(), expected.expected));
}
}
template <typename Clock>
void raft_cluster<Clock>::check_rpc_removed(::check_rpc_removed expected) const {
for (auto node: expected.nodes) {
BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_removed() == expected.expected,
format("RPC removed {} does not match expected {}",
_servers[node.id].rpc->servers_removed(), expected.expected));
}
}
template <typename Clock>
void raft_cluster<Clock>::rpc_reset_counters(::rpc_reset_counters nodes) {
for (auto node: nodes) {
_servers[node.id].rpc->reset_counters();
}
}
template <typename Clock>
future<> raft_cluster<Clock>::reconfigure_all() {
if (_in_configuration.size() < _servers.size()) {
set_config sc;
for (size_t s = 0; s < _servers.size(); ++s) {
sc.push_back(s);
}
co_await change_configuration(std::move(sc));
}
}
template <typename Clock>
future<> raft_cluster<Clock>::partition(::partition p) {
tlogger.debug("partitioning");
std::unordered_set<size_t> partition_servers;
std::optional<size_t> next_leader;
for (auto s: p) {
if (std::holds_alternative<struct leader>(s)) {
next_leader = std::get<struct leader>(s).id;
partition_servers.insert(*next_leader);
} else if (std::holds_alternative<struct range>(s)) {
auto range = std::get<struct range>(s);
for (size_t id = range.start; id <= range.end; id++) {
SCYLLA_ASSERT(id < _servers.size());
partition_servers.insert(id);
}
} else {
partition_servers.insert(std::get<int>(s));
}
}
if (next_leader) {
// Wait for log to propagate to next leader, before disconnections
co_await wait_log(*next_leader);
} else {
// No leader specified, wait log for all connected servers, before disconnections
for (auto s: partition_servers) {
if (_in_configuration.contains(s)) {
co_await wait_log(s);
}
}
}
pause_tickers();
_connected->connect_all();
// NOTE: connectivity is independent of configuration so it's for all servers
for (size_t s = 0; s < _servers.size(); ++s) {
if (partition_servers.find(s) == partition_servers.end()) {
// Disconnect servers not in main partition
_connected->disconnect(to_raft_id(s));
}
}
if (next_leader) {
// New leader specified, elect it
co_await elect_new_leader(*next_leader); // restarts tickers
} else if (partition_servers.find(_leader) == partition_servers.end() && p.size() > 0) {
// Old leader disconnected and not specified new, free election
co_await restart_tickers();
_servers[_leader].server->elapse_election(); // make old leader step down
co_await free_election();
} else {
co_await restart_tickers();
}
}
template <typename Clock>
future<> raft_cluster<Clock>::tick(::tick t) {
for (uint64_t i = 0; i < t.ticks; i++) {
for (auto&& s: _servers) {
s.server->tick();
}
co_await yield();
}
}
template <typename Clock>
future<> raft_cluster<Clock>::read(read_value r) {
co_await _servers[r.node_idx].server->read_barrier(nullptr);
auto val = _servers[r.node_idx].sm->hasher->finalize_uint64();
auto expected = hasher_int::hash_range(r.expected_index).finalize_uint64();
BOOST_CHECK_MESSAGE(val == expected,
format("Read on server {} saw the wrong value {} != {}", r.node_idx, val, expected));
}
template <typename Clock>
future<> raft_cluster<Clock>::stop(::stop server) {
co_await stop_server(server.id);
}
template <typename Clock>
future<> raft_cluster<Clock>::reset(::reset server) {
co_await reset_server(server.id, server.state);
}
template <typename Clock>
void raft_cluster<Clock>::disconnect(::disconnect nodes) {
_connected->cut(to_raft_id(nodes.first), to_raft_id(nodes.second));
}
template <typename Clock>
future<> raft_cluster<Clock>::isolate(::isolate node) {
tlogger.debug("disconnecting {}", to_raft_id(node.id));
_connected->disconnect(to_raft_id(node.id));
if (node.id == _leader) {
_servers[_leader].server->elapse_election(); // make old leader step down
co_await free_election();
}
co_return;
}
template <typename Clock>
void raft_cluster<Clock>::verify() {
BOOST_TEST_MESSAGE("Verifying hashes match expected (snapshot and apply calls)");
auto expected = hasher_int::hash_range(_apply_entries).finalize_uint64();
for (auto i: _in_configuration) {
auto digest = _servers[i].sm->hasher->finalize_uint64();
BOOST_CHECK_MESSAGE(digest == expected,
format("Digest doesn't match for server [{}]: {} != {}", i, digest, expected));
}
if (_verify_persisted_snapshots) {
BOOST_TEST_MESSAGE("Verifying persisted snapshots");
// TODO: check that snapshot is taken when it should be
for (auto& s : (*_persisted_snapshots)) {
auto& [snp, val] = s.second;
auto digest = val.hasher.finalize_uint64();
auto expected = hasher_int::hash_range(val.idx.value()).finalize_uint64();
BOOST_CHECK_MESSAGE(digest == expected,
format("Persisted snapshot {} doesn't match {} != {}", snp.id, digest, expected));
}
}
}
template <typename Clock>
std::vector<initial_state> raft_cluster<Clock>::get_states(test_case test, bool prevote) {
std::vector<initial_state> states(test.nodes); // Server initial states
size_t leader = test.initial_leader;
states[leader].term = raft::term_t{test.initial_term};
// Server initial logs, etc
for (size_t i = 0; i < states.size(); ++i) {
raft::index_t start_idx{1};
if (i < test.initial_snapshots.size()) {
states[i].snapshot = test.initial_snapshots[i].snap;
states[i].snp_value.hasher = hasher_int::hash_range(test.initial_snapshots[i].snap.idx.value());
states[i].snp_value.idx = test.initial_snapshots[i].snap.idx;
start_idx = states[i].snapshot.idx + raft::index_t{1};
}
if (i < test.initial_states.size()) {
auto state = test.initial_states[i];
states[i].log = create_log(state.le, start_idx);
} else {
states[i].log = {};
}
if (i < test.config.size()) {
states[i].server_config = test.config[i];
} else {
states[i].server_config = { .enable_prevoting = prevote };
}
}
return states;
}
template <typename Clock>
struct run_test {
future<> operator() (test_case test, bool prevote, typename Clock::duration tick_delta,
rpc_config rpc_config) {
hasher_int::set_commutative(test.commutative_hash);
tlogger.debug("starting test with {}",
rpc_config.network_delay > 0ms? "delays" : "no delays");
raft_cluster<Clock> rafts(test, ::apply_changes, test.total_values,
test.get_first_val(), test.initial_leader, prevote,
tick_delta, rpc_config);
co_await rafts.start_all();
BOOST_TEST_MESSAGE("Processing updates");
// Process all updates in order
for (auto update: test.updates) {
co_await std::visit(make_visitor(
[&rafts] (entries update) -> future<> {
co_await (update.concurrent
? rafts.add_entries_concurrent(update.n, update.server)
: rafts.add_entries(update.n, update.server));
},
[&rafts] (new_leader update) -> future<> {
co_await rafts.elect_new_leader(update.id);
},
[&rafts] (disconnect update) -> future<> {
rafts.disconnect(update);
co_return;
},
[&rafts] (::isolate update) -> future<> {
co_await rafts.isolate(update);
},
[&rafts] (partition update) -> future<> {
co_await rafts.partition(update);
},
[&rafts] (stop update) -> future<> {
co_await rafts.stop(update);
},
[&rafts] (reset update) -> future<> {
co_await rafts.reset(update);
},
[&rafts] (wait_log update) -> future<> {
co_await rafts.wait_log(update);
},
[&rafts] (set_config update) -> future<> {
co_await rafts.change_configuration(update);
},
[&rafts] (check_rpc_config update) -> future<> {
co_await rafts.check_rpc_config(update);
},
[&rafts] (check_rpc_added update) -> future<> {
rafts.check_rpc_added(update);
co_return;
},
[&rafts] (check_rpc_removed update) -> future<> {
rafts.check_rpc_removed(update);
co_return;
},
[&rafts] (rpc_reset_counters update) -> future<> {
rafts.rpc_reset_counters(update);
co_return;
},
[&rafts] (tick update) -> future<> {
co_await rafts.tick(update);
},
[&rafts] (read_value update) -> future<> {
co_await rafts.read(update);
}
), std::move(update));
}
// Reconnect and bring all nodes back into configuration, if needed
rafts.connect_all();
co_await rafts.reconfigure_all();
if (test.total_values > 0) {
BOOST_TEST_MESSAGE("Appending remaining values");
co_await rafts.add_remaining_entries();
co_await rafts.wait_all();
}
co_await rafts.stop_all();
if (test.total_values > 0) {
rafts.verify();
}
}
};
template <typename Clock>
void replication_test(struct test_case test, bool prevote,
typename Clock::duration tick_delta,
rpc_config rpc_config = {}) {
run_test<Clock>{}(std::move(test), prevote, tick_delta, rpc_config).get();
}