raft: replication test: raft_cluster add_entries

Move add_entries() to raft_cluster and provide a helper to add remaining
entries.

Signed-off-by: Alejo Sanchez <alejo.sanchez@scylladb.com>
This commit is contained in:
Alejo Sanchez
2021-05-06 22:13:39 -04:00
parent 2a1e7a15a6
commit 2d51d1bbc5

View File

@@ -476,13 +476,15 @@ class raft_cluster {
lw_shared_ptr<snapshots> _snapshots;
lw_shared_ptr<persisted_snapshots> _persisted_snapshots;
size_t _apply_entries;
size_t _next_val;
bool _packet_drops;
state_machine::apply_fn _apply;
public:
raft_cluster(std::vector<initial_state> states, state_machine::apply_fn apply,
size_t apply_entries, lw_shared_ptr<connected> connected,
lw_shared_ptr<snapshots> snapshots,
lw_shared_ptr<persisted_snapshots> persisted_snapshots, bool packet_drops);
lw_shared_ptr<persisted_snapshots> persisted_snapshots, size_t first_val,
bool packet_drops);
// No copy
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
@@ -498,6 +500,8 @@ public:
void tick_all();
void disconnect(size_t id, std::optional<raft::server_id> except = std::nullopt);
void connect_all();
future<> add_entries(size_t n, size_t& leader);
future<> add_remaining_entries(size_t& leader);
};
test_server
@@ -524,11 +528,13 @@ create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, initial_
raft_cluster::raft_cluster(std::vector<initial_state> states, state_machine::apply_fn apply,
size_t apply_entries, lw_shared_ptr<connected> connected, lw_shared_ptr<snapshots> snapshots,
lw_shared_ptr<persisted_snapshots> persisted_snapshots, bool packet_drops) :
lw_shared_ptr<persisted_snapshots> persisted_snapshots, size_t first_val,
bool packet_drops) :
_connected(connected),
_snapshots(snapshots),
_persisted_snapshots(persisted_snapshots),
_apply_entries(apply_entries),
_next_val(first_val),
_packet_drops(packet_drops),
_apply(apply) {
raft::configuration config;
@@ -751,13 +757,12 @@ future<std::unordered_set<size_t>> change_configuration(raft_cluster& rafts,
}
// Add consecutive integer entries to a leader
future<> add_entries(raft_cluster& rafts,
size_t start, size_t end, size_t& leader) {
size_t value = start;
for (size_t value = start; value != end;) {
future<> raft_cluster::add_entries(size_t n, size_t& leader) {
size_t end = _next_val + n;
while (_next_val != end) {
try {
co_await rafts[leader].server->add_entry(create_command(value), raft::wait_type::committed);
value++;
co_await _servers[leader].server->add_entry(create_command(_next_val), raft::wait_type::committed);
_next_val++;
} catch (raft::not_a_leader& e) {
// leader stepped down, update with new leader if present
if (e.leader != raft::server_id{}) {
@@ -770,6 +775,10 @@ future<> add_entries(raft_cluster& rafts,
}
}
future<> raft_cluster::add_remaining_entries(size_t& leader) {
co_await add_entries(_apply_entries - _next_val, leader);
}
using raft_ticker_type = seastar::timer<lowres_clock>;
std::vector<raft_ticker_type> init_raft_tickers(raft_cluster& rafts) {
@@ -824,7 +833,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
auto connected = make_lw_shared<struct connected>(test.nodes);
raft_cluster rafts(get_states(test, prevote), apply_changes, test.total_values, connected,
snaps, persisted_snaps, packet_drops);
snaps, persisted_snaps, test.get_first_val(), packet_drops);
co_await rafts.start_all();
// Tickers for servers
@@ -841,17 +850,13 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
co_await rafts[leader].server->wait_election_done();
BOOST_TEST_MESSAGE("Processing updates");
// Count existing leader snap index and entries, if present
size_t next_val = test.get_first_val();
// Process all updates in order
for (auto update: test.updates) {
if (std::holds_alternative<entries>(update)) {
auto n = std::get<entries>(update).n;
BOOST_CHECK_MESSAGE(in_configuration.contains(leader),
format("Current leader {} is not in configuration", leader));
co_await add_entries(rafts, next_val, next_val + n, leader);
next_val += n;
co_await rafts.add_entries(n, leader);
} else if (std::holds_alternative<new_leader>(update)) {
unsigned next_leader = std::get<new_leader>(update).id;
co_await wait_log(rafts, connected, in_configuration, leader, next_leader);
@@ -926,9 +931,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
}
BOOST_TEST_MESSAGE("Appending remaining values");
if (next_val < test.total_values) {
co_await add_entries(rafts, next_val, test.total_values, leader);
}
co_await rafts.add_remaining_entries(leader);
co_await rafts.wait_all();
co_await rafts.stop_all();
@@ -1001,7 +1004,7 @@ future<> rpc_test(size_t nodes, test_func test_case_body) {
rpc::reset_network();
// Initialize and start the cluster with corresponding tickers
raft_cluster rafts(states, dummy_apply_fn, 1, conn,
make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), 0, false);
co_await rafts.start_all();
auto tickers = init_raft_tickers(rafts);
// Keep track of what servers are in the current configuration
@@ -1248,7 +1251,7 @@ SEASTAR_TEST_CASE(rpc_load_conf_from_snapshot) {
raft_cluster rafts(states, dummy_apply_fn, 0,
make_lw_shared<connected>(1), make_lw_shared<snapshots>(),
make_lw_shared<persisted_snapshots>(), false);
make_lw_shared<persisted_snapshots>(), 0, false);
co_await rafts.start_all();
BOOST_CHECK(rafts[0].rpc->known_peers() == address_set({sid}));
@@ -1269,7 +1272,7 @@ SEASTAR_TEST_CASE(rpc_load_conf_from_log) {
raft_cluster rafts(states, dummy_apply_fn, 0,
make_lw_shared<connected>(1), make_lw_shared<snapshots>(),
make_lw_shared<persisted_snapshots>(), false);
make_lw_shared<persisted_snapshots>(), 0, false);
co_await rafts.start_all();
BOOST_CHECK(rafts[0].rpc->known_peers() == address_set({sid}));