diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index fa4e1372e6..a71677fe19 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -492,6 +492,7 @@ public: raft_cluster(const raft_cluster&) = delete; raft_cluster(raft_cluster&&) = default; future<> stop_server(size_t id); + future<> reset_server(size_t id, initial_state state); // Reset a stopped server size_t size() { return _servers.size(); } @@ -585,6 +586,14 @@ future<> raft_cluster::stop_server(size_t id) { co_await _servers[id].server->abort(); } +// Reset previously stopped server +future<> raft_cluster::reset_server(size_t id, initial_state state) { + _servers[id] = create_raft_server(to_raft_id(id), _apply, state, _apply_entries, + _connected, _snapshots, _persisted_snapshots, _packet_drops); + co_await _servers[id].server->start(); + set_ticker_callback(id); +} + future<> raft_cluster::start_all() { co_await parallel_for_each(_servers, [] (auto& r) { return r.server->start(); @@ -814,10 +823,7 @@ future<> raft_cluster::change_configuration(size_t total_values, set_config sc) for (auto s: _in_configuration) { if (!new_config.contains(s)) { co_await stop_server(s); - _servers[s] = create_raft_server(to_raft_id(s), _apply, initial_state{.log = {}}, - total_values, _connected, _snapshots, _persisted_snapshots, _packet_drops); - co_await _servers[s].server->start(); - set_ticker_callback(s); + co_await reset_server(s, initial_state{.log = {}}); } } restart_tickers(); @@ -1405,7 +1411,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) { // Emulate a failed configuration change on A (add node D) by // restarting A with a modified initial log containing one extraneous // configuration entry. - co_await rafts[initial_leader].server->abort(); + co_await rafts.stop_server(initial_leader); // Restart A with a synthetic initial state representing // the same initial snapshot config (A, B, C) as before, // but with the following things in mind: @@ -1427,10 +1433,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) { }, .snapshot = {.config = all_nodes} }; - rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1, - connected, make_lw_shared(), make_lw_shared(), false); - co_await rafts[initial_leader].server->start(); - rafts.set_ticker_callback(initial_leader); + co_await rafts.reset_server(initial_leader, restart_state); rafts.restart_tickers(); // A should see {A, B, C, D} as RPC config since @@ -1515,7 +1518,6 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { // `set_configuration` call will fail on A because // it's cut off the other nodes and it will be waiting for them, // but A is terminated before the network is allowed to heal the partition. - rafts.cancel_ticker(0); co_await rafts.stop_server(initial_leader); // Restart A with a synthetic initial state that contains two entries // in the log: @@ -1536,10 +1538,8 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { }, .snapshot = {.config = all_nodes} }; - rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1, - connected, make_lw_shared(), make_lw_shared(), false); - co_await rafts[initial_leader].server->start(); - rafts.set_ticker_callback(initial_leader); + + co_await rafts.reset_server(initial_leader, restart_state); rafts.restart_tickers(); // A's RPC configuration should stay the same because @@ -1583,8 +1583,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { rafts.disconnect(0); // Try to add D back. - rafts.cancel_ticker(0); - co_await rafts[initial_leader].server->abort(); + co_await rafts.stop_server(initial_leader); initial_state restart_state_2{ .log = { raft::log_entry{raft::term_t(1), raft::index_t(1), @@ -1599,10 +1598,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { }, .snapshot = {.config = all_nodes} }; - rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state_2, 1, - connected, make_lw_shared(), make_lw_shared(), false); - co_await rafts[initial_leader].server->start(); - rafts.set_ticker_callback(initial_leader); + co_await rafts.reset_server(initial_leader, restart_state_2); rafts.restart_tickers(); // A should observe RPC configuration = {A, B, C, D} since it's the union