From ccb85bce0262dd3768c29b591e67d1126822c03b Mon Sep 17 00:00:00 2001 From: Alejo Sanchez Date: Mon, 26 Apr 2021 13:53:35 -0400 Subject: [PATCH] raft: replication test: wait log on next leader only When there's a defined next leader, only wait for log propagation for this follower. Splits wait_log() to waiting for one follower with wait_log() and waiting for all followers with wait_log(). Signed-off-by: Alejo Sanchez --- test/raft/replication_test.cc | 47 ++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index 4d2e81a794..78fff23dc1 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -530,10 +530,22 @@ struct test_case { const std::vector updates; }; +// Wait for leader log to propagate to follower future<> wait_log(std::vector& rafts, - lw_shared_ptr connected, std::unordered_set& in_configuration, - size_t leader) { - // Wait for leader log to propagate + lw_shared_ptr connected, std::unordered_set& in_configuration, + size_t leader, size_t follower) { + + if ((*connected)(to_raft_id(leader), to_raft_id(follower)) && + in_configuration.contains(leader) && in_configuration.contains(follower)) { + auto leader_log_idx = rafts[leader].server->log_last_idx(); + co_await rafts[follower].server->wait_log_idx(leader_log_idx); + } +} + +// Wait for all connected followers to catch up +future<> wait_log_all(std::vector& rafts, + lw_shared_ptr connected, std::unordered_set& in_configuration, + size_t leader) { auto leader_log_idx = rafts[leader].server->log_last_idx(); for (size_t s = 0; s < rafts.size(); ++s) { if (s != leader && (*connected)(to_raft_id(s), to_raft_id(leader)) && @@ -768,16 +780,12 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) { next_val += n; } else if (std::holds_alternative(update)) { unsigned next_leader = std::get(update).id; - auto leader_log_idx = rafts[leader].server->log_last_idx(); - co_await rafts[next_leader].server->wait_log_idx(leader_log_idx); + co_await wait_log(rafts, connected, in_configuration, leader, next_leader); pause_tickers(tickers); leader = co_await elect_new_leader(rafts, connected, leader, next_leader); restart_tickers(tickers); } else if (std::holds_alternative(update)) { - co_await wait_log(rafts, connected, in_configuration, leader); - pause_tickers(tickers); auto p = std::get(update); - connected->connect_all(); std::unordered_set partition_servers; struct leader new_leader; bool have_new_leader = false; @@ -792,6 +800,17 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) { } partition_servers.insert(id); } + if (have_new_leader) { + // Wait for log to propagate to next leader, before disconnections + co_await wait_log(rafts, connected, in_configuration, leader, new_leader.id); + } else { + // No leader specified, wait log for all connected servers, before disconnections + for (auto s: partition_servers) { + co_await wait_log(rafts, connected, in_configuration, leader, s); + } + } + pause_tickers(tickers); + connected->connect_all(); for (size_t s = 0; s < test.nodes; ++s) { if (partition_servers.find(s) == partition_servers.end()) { // Disconnect servers not in main partition @@ -807,7 +826,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) { } restart_tickers(tickers); } else if (std::holds_alternative(update)) { - co_await wait_log(rafts, connected, in_configuration, leader); + co_await wait_log_all(rafts, connected, in_configuration, leader); auto sc = std::get(update); in_configuration = co_await change_configuration(rafts, test.total_values, connected, in_configuration, snaps, persisted_snaps, packet_drops, std::move(sc), leader, tickers, apply_changes); @@ -929,7 +948,7 @@ future<> rpc_test(size_t nodes, test_func test_case_body) { constexpr size_t initial_leader = 0; rafts[initial_leader].server->wait_until_candidate(); co_await rafts[initial_leader].server->wait_election_done(); - co_await wait_log(rafts, conn, in_configuration, initial_leader); + co_await wait_log_all(rafts, conn, in_configuration, initial_leader); try { // Execute the test co_await test_case_body(rafts, conn, tickers, initial_leader, in_configuration); @@ -1359,7 +1378,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) { connected->connect_all(); // wait to synchronize logs between current leader (B) and the rest of the cluster - co_await wait_log(rafts, connected, in_configuration, new_leader); + co_await wait_log_all(rafts, connected, in_configuration, initial_leader); // A should have truncated an offending configuration entry and revert its RPC configuration. // // Since B's log is effectively empty (does not contain any configuration @@ -1468,7 +1487,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { connected->connect_all(); // wait to synchronize logs between current leader (B) and the rest of the cluster - co_await wait_log(rafts, connected, in_configuration, new_leader); + co_await wait_log_all(rafts, connected, in_configuration, new_leader); // Again, A's RPC configuration is the same as before despite the // real cfg being reverted to the committed state as it is the union @@ -1489,7 +1508,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { co_await elect_new_leader(rafts, connected, new_leader, initial_leader); restart_tickers(tickers); - co_await wait_log(rafts, connected, in_configuration, initial_leader); + co_await wait_log_all(rafts, connected, in_configuration, new_leader); // Disconnect A from the rest of the cluster. connected->disconnect(to_raft_id(0)); @@ -1531,7 +1550,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { connected->connect_all(); // wait to synchronize logs between current leader (B) and the rest of the cluster - co_await wait_log(rafts, connected, in_configuration, new_leader); + co_await wait_log_all(rafts, connected, in_configuration, new_leader); // A's RPC configuration is reverted to committed configuration {A, B, C}. co_await seastar::async([&] { CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), committed_conf);