diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index cba6fbaad7..0753cba071 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -476,13 +476,15 @@ class raft_cluster { lw_shared_ptr _snapshots; lw_shared_ptr _persisted_snapshots; size_t _apply_entries; + size_t _next_val; bool _packet_drops; state_machine::apply_fn _apply; public: raft_cluster(std::vector states, state_machine::apply_fn apply, size_t apply_entries, lw_shared_ptr connected, lw_shared_ptr snapshots, - lw_shared_ptr persisted_snapshots, bool packet_drops); + lw_shared_ptr persisted_snapshots, size_t first_val, + bool packet_drops); // No copy raft_cluster(const raft_cluster&) = delete; raft_cluster(raft_cluster&&) = default; @@ -498,6 +500,8 @@ public: void tick_all(); void disconnect(size_t id, std::optional except = std::nullopt); void connect_all(); + future<> add_entries(size_t n, size_t& leader); + future<> add_remaining_entries(size_t& leader); }; test_server @@ -524,11 +528,13 @@ create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, initial_ raft_cluster::raft_cluster(std::vector states, state_machine::apply_fn apply, size_t apply_entries, lw_shared_ptr connected, lw_shared_ptr snapshots, - lw_shared_ptr persisted_snapshots, bool packet_drops) : + lw_shared_ptr persisted_snapshots, size_t first_val, + bool packet_drops) : _connected(connected), _snapshots(snapshots), _persisted_snapshots(persisted_snapshots), _apply_entries(apply_entries), + _next_val(first_val), _packet_drops(packet_drops), _apply(apply) { raft::configuration config; @@ -751,13 +757,12 @@ future> change_configuration(raft_cluster& rafts, } // Add consecutive integer entries to a leader -future<> add_entries(raft_cluster& rafts, - size_t start, size_t end, size_t& leader) { - size_t value = start; - for (size_t value = start; value != end;) { +future<> raft_cluster::add_entries(size_t n, size_t& leader) { + size_t end = _next_val + n; + while (_next_val != end) { try { - co_await rafts[leader].server->add_entry(create_command(value), raft::wait_type::committed); - value++; + co_await _servers[leader].server->add_entry(create_command(_next_val), raft::wait_type::committed); + _next_val++; } catch (raft::not_a_leader& e) { // leader stepped down, update with new leader if present if (e.leader != raft::server_id{}) { @@ -770,6 +775,10 @@ future<> add_entries(raft_cluster& rafts, } } +future<> raft_cluster::add_remaining_entries(size_t& leader) { + co_await add_entries(_apply_entries - _next_val, leader); +} + using raft_ticker_type = seastar::timer; std::vector init_raft_tickers(raft_cluster& rafts) { @@ -824,7 +833,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) { auto connected = make_lw_shared(test.nodes); raft_cluster rafts(get_states(test, prevote), apply_changes, test.total_values, connected, - snaps, persisted_snaps, packet_drops); + snaps, persisted_snaps, test.get_first_val(), packet_drops); co_await rafts.start_all(); // Tickers for servers @@ -841,17 +850,13 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) { co_await rafts[leader].server->wait_election_done(); BOOST_TEST_MESSAGE("Processing updates"); - // Count existing leader snap index and entries, if present - size_t next_val = test.get_first_val(); - // Process all updates in order for (auto update: test.updates) { if (std::holds_alternative(update)) { auto n = std::get(update).n; BOOST_CHECK_MESSAGE(in_configuration.contains(leader), format("Current leader {} is not in configuration", leader)); - co_await add_entries(rafts, next_val, next_val + n, leader); - next_val += n; + co_await rafts.add_entries(n, leader); } else if (std::holds_alternative(update)) { unsigned next_leader = std::get(update).id; co_await wait_log(rafts, connected, in_configuration, leader, next_leader); @@ -926,9 +931,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) { } BOOST_TEST_MESSAGE("Appending remaining values"); - if (next_val < test.total_values) { - co_await add_entries(rafts, next_val, test.total_values, leader); - } + co_await rafts.add_remaining_entries(leader); co_await rafts.wait_all(); co_await rafts.stop_all(); @@ -1001,7 +1004,7 @@ future<> rpc_test(size_t nodes, test_func test_case_body) { rpc::reset_network(); // Initialize and start the cluster with corresponding tickers raft_cluster rafts(states, dummy_apply_fn, 1, conn, - make_lw_shared(), make_lw_shared(), false); + make_lw_shared(), make_lw_shared(), 0, false); co_await rafts.start_all(); auto tickers = init_raft_tickers(rafts); // Keep track of what servers are in the current configuration @@ -1248,7 +1251,7 @@ SEASTAR_TEST_CASE(rpc_load_conf_from_snapshot) { raft_cluster rafts(states, dummy_apply_fn, 0, make_lw_shared(1), make_lw_shared(), - make_lw_shared(), false); + make_lw_shared(), 0, false); co_await rafts.start_all(); BOOST_CHECK(rafts[0].rpc->known_peers() == address_set({sid})); @@ -1269,7 +1272,7 @@ SEASTAR_TEST_CASE(rpc_load_conf_from_log) { raft_cluster rafts(states, dummy_apply_fn, 0, make_lw_shared(1), make_lw_shared(), - make_lw_shared(), false); + make_lw_shared(), 0, false); co_await rafts.start_all(); BOOST_CHECK(rafts[0].rpc->known_peers() == address_set({sid}));