raft: replication test: wait log on next leader only

When there's a defined next leader, only wait for log propagation for
this follower.

Splits wait_log() to waiting for one follower with wait_log() and
waiting for all followers with wait_log().

Signed-off-by: Alejo Sanchez <alejo.sanchez@scylladb.com>
This commit is contained in:
Alejo Sanchez
2021-04-26 13:53:35 -04:00
parent 2aa1646e35
commit ccb85bce02

View File

@@ -530,10 +530,22 @@ struct test_case {
const std::vector<update> updates;
};
// Wait for leader log to propagate to follower
future<> wait_log(std::vector<test_server>& rafts,
lw_shared_ptr<connected> connected, std::unordered_set<size_t>& in_configuration,
size_t leader) {
// Wait for leader log to propagate
lw_shared_ptr<connected> connected, std::unordered_set<size_t>& in_configuration,
size_t leader, size_t follower) {
if ((*connected)(to_raft_id(leader), to_raft_id(follower)) &&
in_configuration.contains(leader) && in_configuration.contains(follower)) {
auto leader_log_idx = rafts[leader].server->log_last_idx();
co_await rafts[follower].server->wait_log_idx(leader_log_idx);
}
}
// Wait for all connected followers to catch up
future<> wait_log_all(std::vector<test_server>& rafts,
lw_shared_ptr<connected> connected, std::unordered_set<size_t>& in_configuration,
size_t leader) {
auto leader_log_idx = rafts[leader].server->log_last_idx();
for (size_t s = 0; s < rafts.size(); ++s) {
if (s != leader && (*connected)(to_raft_id(s), to_raft_id(leader)) &&
@@ -768,16 +780,12 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
next_val += n;
} else if (std::holds_alternative<new_leader>(update)) {
unsigned next_leader = std::get<new_leader>(update).id;
auto leader_log_idx = rafts[leader].server->log_last_idx();
co_await rafts[next_leader].server->wait_log_idx(leader_log_idx);
co_await wait_log(rafts, connected, in_configuration, leader, next_leader);
pause_tickers(tickers);
leader = co_await elect_new_leader(rafts, connected, leader, next_leader);
restart_tickers(tickers);
} else if (std::holds_alternative<partition>(update)) {
co_await wait_log(rafts, connected, in_configuration, leader);
pause_tickers(tickers);
auto p = std::get<partition>(update);
connected->connect_all();
std::unordered_set<size_t> partition_servers;
struct leader new_leader;
bool have_new_leader = false;
@@ -792,6 +800,17 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
}
partition_servers.insert(id);
}
if (have_new_leader) {
// Wait for log to propagate to next leader, before disconnections
co_await wait_log(rafts, connected, in_configuration, leader, new_leader.id);
} else {
// No leader specified, wait log for all connected servers, before disconnections
for (auto s: partition_servers) {
co_await wait_log(rafts, connected, in_configuration, leader, s);
}
}
pause_tickers(tickers);
connected->connect_all();
for (size_t s = 0; s < test.nodes; ++s) {
if (partition_servers.find(s) == partition_servers.end()) {
// Disconnect servers not in main partition
@@ -807,7 +826,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
}
restart_tickers(tickers);
} else if (std::holds_alternative<set_config>(update)) {
co_await wait_log(rafts, connected, in_configuration, leader);
co_await wait_log_all(rafts, connected, in_configuration, leader);
auto sc = std::get<set_config>(update);
in_configuration = co_await change_configuration(rafts, test.total_values, connected,
in_configuration, snaps, persisted_snaps, packet_drops, std::move(sc), leader, tickers, apply_changes);
@@ -929,7 +948,7 @@ future<> rpc_test(size_t nodes, test_func test_case_body) {
constexpr size_t initial_leader = 0;
rafts[initial_leader].server->wait_until_candidate();
co_await rafts[initial_leader].server->wait_election_done();
co_await wait_log(rafts, conn, in_configuration, initial_leader);
co_await wait_log_all(rafts, conn, in_configuration, initial_leader);
try {
// Execute the test
co_await test_case_body(rafts, conn, tickers, initial_leader, in_configuration);
@@ -1359,7 +1378,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) {
connected->connect_all();
// wait to synchronize logs between current leader (B) and the rest of the cluster
co_await wait_log(rafts, connected, in_configuration, new_leader);
co_await wait_log_all(rafts, connected, in_configuration, initial_leader);
// A should have truncated an offending configuration entry and revert its RPC configuration.
//
// Since B's log is effectively empty (does not contain any configuration
@@ -1468,7 +1487,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
connected->connect_all();
// wait to synchronize logs between current leader (B) and the rest of the cluster
co_await wait_log(rafts, connected, in_configuration, new_leader);
co_await wait_log_all(rafts, connected, in_configuration, new_leader);
// Again, A's RPC configuration is the same as before despite the
// real cfg being reverted to the committed state as it is the union
@@ -1489,7 +1508,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
co_await elect_new_leader(rafts, connected, new_leader, initial_leader);
restart_tickers(tickers);
co_await wait_log(rafts, connected, in_configuration, initial_leader);
co_await wait_log_all(rafts, connected, in_configuration, new_leader);
// Disconnect A from the rest of the cluster.
connected->disconnect(to_raft_id(0));
@@ -1531,7 +1550,7 @@ SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
connected->connect_all();
// wait to synchronize logs between current leader (B) and the rest of the cluster
co_await wait_log(rafts, connected, in_configuration, new_leader);
co_await wait_log_all(rafts, connected, in_configuration, new_leader);
// A's RPC configuration is reverted to committed configuration {A, B, C}.
co_await seastar::async([&] {
CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), committed_conf);