raft: replication test: reset server helper

Add a helper to reset a server in raft_cluster.

Besides simplifying code and preventing errors, this will help move
create_raft_server logic to raft_cluster.

Signed-off-by: Alejo Sanchez <alejo.sanchez@scylladb.com>
This commit is contained in:
Alejo Sanchez
2021-05-08 15:26:20 -04:00
parent d3b7f21b88
commit 453f19cf0e

View File

@@ -492,6 +492,7 @@ public:
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
future<> stop_server(size_t id);
future<> reset_server(size_t id, initial_state state); // Reset a stopped server
size_t size() {
return _servers.size();
}
@@ -585,6 +586,14 @@ future<> raft_cluster::stop_server(size_t id) {
co_await _servers[id].server->abort();
}
// Reset previously stopped server
future<> raft_cluster::reset_server(size_t id, initial_state state) {
_servers[id] = create_raft_server(to_raft_id(id), _apply, state, _apply_entries,
_connected, _snapshots, _persisted_snapshots, _packet_drops);
co_await _servers[id].server->start();
set_ticker_callback(id);
}
future<> raft_cluster::start_all() {
co_await parallel_for_each(_servers, [] (auto& r) {
return r.server->start();
@@ -814,10 +823,7 @@ future<> raft_cluster::change_configuration(size_t total_values, set_config sc)
for (auto s: _in_configuration) {
if (!new_config.contains(s)) {
co_await stop_server(s);
_servers[s] = create_raft_server(to_raft_id(s), _apply, initial_state{.log = {}},
total_values, _connected, _snapshots, _persisted_snapshots, _packet_drops);
co_await _servers[s].server->start();
set_ticker_callback(s);
co_await reset_server(s, initial_state{.log = {}});
}
}
restart_tickers();
@@ -1405,7 +1411,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) {
// Emulate a failed configuration change on A (add node D) by
// restarting A with a modified initial log containing one extraneous
// configuration entry.
co_await rafts[initial_leader].server->abort();
co_await rafts.stop_server(initial_leader);
// Restart A with a synthetic initial state representing
// the same initial snapshot config (A, B, C) as before,
// but with the following things in mind:
@@ -1427,10 +1433,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) {
},
.snapshot = {.config = all_nodes}
};
rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1,
connected, make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
co_await rafts[initial_leader].server->start();
rafts.set_ticker_callback(initial_leader);
co_await rafts.reset_server(initial_leader, restart_state);
rafts.restart_tickers();
// A should see {A, B, C, D} as RPC config since
@@ -1515,7 +1518,6 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
// `set_configuration` call will fail on A because
// it's cut off the other nodes and it will be waiting for them,
// but A is terminated before the network is allowed to heal the partition.
rafts.cancel_ticker(0);
co_await rafts.stop_server(initial_leader);
// Restart A with a synthetic initial state that contains two entries
// in the log:
@@ -1536,10 +1538,8 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
},
.snapshot = {.config = all_nodes}
};
rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1,
connected, make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
co_await rafts[initial_leader].server->start();
rafts.set_ticker_callback(initial_leader);
co_await rafts.reset_server(initial_leader, restart_state);
rafts.restart_tickers();
// A's RPC configuration should stay the same because
@@ -1583,8 +1583,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
rafts.disconnect(0);
// Try to add D back.
rafts.cancel_ticker(0);
co_await rafts[initial_leader].server->abort();
co_await rafts.stop_server(initial_leader);
initial_state restart_state_2{
.log = {
raft::log_entry{raft::term_t(1), raft::index_t(1),
@@ -1599,10 +1598,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
},
.snapshot = {.config = all_nodes}
};
rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state_2, 1,
connected, make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
co_await rafts[initial_leader].server->start();
rafts.set_ticker_callback(initial_leader);
co_await rafts.reset_server(initial_leader, restart_state_2);
rafts.restart_tickers();
// A should observe RPC configuration = {A, B, C, D} since it's the union