diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index de49d625bb..e3d17d7983 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -28,11 +28,13 @@ #include #include #include +#include #include "raft/server.hh" #include "serializer.hh" #include "serializer_impl.hh" #include "xx_hasher.hh" #include "test/raft/helpers.hh" +#include "test/lib/eventually.hh" // Test Raft library with declarative test definitions // @@ -301,6 +303,9 @@ class rpc : public raft::rpc { lw_shared_ptr _connected; lw_shared_ptr _snapshots; bool _packet_drops; + raft::server_address_set _known_peers; + uint32_t _servers_added = 0; + uint32_t _servers_removed = 0; public: rpc(raft::server_id id, lw_shared_ptr connected, lw_shared_ptr snapshots, bool packet_drops) : _id(id), _connected(connected), _snapshots(snapshots), @@ -376,12 +381,32 @@ public: net[id]->_client->timeout_now_request(_id, std::move(timeout_now)); return make_ready_future<>(); } - virtual void add_server(raft::server_id id, bytes node_info) {} - virtual void remove_server(raft::server_id id) {} + virtual void add_server(raft::server_id id, bytes node_info) { + _known_peers.insert(raft::server_address{id}); + ++_servers_added; + } + virtual void remove_server(raft::server_id id) { + _known_peers.erase(raft::server_address{id}); + ++_servers_removed; + } virtual future<> abort() { return make_ready_future<>(); } static void reset_network() { net.clear(); } + + const raft::server_address_set& known_peers() const { + return _known_peers; + } + void reset_counters() { + _servers_added = 0; + _servers_removed = 0; + } + uint32_t servers_added() const { + return _servers_added; + } + uint32_t servers_removed() const { + return _servers_removed; + } }; std::unordered_map rpc::net; @@ -389,6 +414,7 @@ std::unordered_map rpc::net; struct test_server { std::unique_ptr server; state_machine* sm; + rpc* rpc; }; test_server @@ -399,6 +425,7 @@ create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, initial_ auto sm = std::make_unique(uuid, std::move(apply), apply_entries, snapshots); auto& rsm = *sm; auto mrpc = std::make_unique(uuid, connected, snapshots, packet_drops); + auto& rpc_ref = *mrpc; auto mpersistence = std::make_unique(uuid, state, snapshots, persisted_snapshots); auto fd = seastar::make_shared(uuid, connected); @@ -407,7 +434,8 @@ create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, initial_ return { std::move(raft), - &rsm + &rsm, + &rpc_ref }; } @@ -870,6 +898,67 @@ void replication_test(struct test_case test, bool prevote, bool packet_drops) { SEASTAR_THREAD_TEST_CASE(test_name ## _prevote_drops) { \ replication_test(test_body, true, true); } +raft::server_address_set full_cluster_address_set(size_t nodes) { + raft::server_address_set res; + for (size_t i = 0; i < nodes; ++i) { + res.emplace(to_server_address(i)); + } + return res; +} + +using test_func = seastar::noncopyable_function< + future<>(std::vector&, lw_shared_ptr, std::vector&, + size_t, std::unordered_set&)>; + +size_t dummy_apply_fn(raft::server_id id, const std::vector& commands, + lw_shared_ptr hasher) { + return 0; +} + +future> rpc_test_change_configuration(std::vector& rafts, + lw_shared_ptr connected, std::unordered_set& in_configuration, + set_config sc, size_t& leader, + std::vector>& tickers) { + return change_configuration(rafts, 1, connected, in_configuration, + make_lw_shared(), make_lw_shared(), + false, sc, leader, tickers, dummy_apply_fn); +} + +// Wrapper function for running RPC tests that provides a convenient +// automatic initialization and de-initialization of a raft cluster. +future<> rpc_test(size_t nodes, test_func test_case_body) { + std::vector states(nodes); + auto conn = make_lw_shared(nodes); + + rpc::reset_network(); + // Initialize and start the cluster with corresponding tickers + auto rafts = co_await create_cluster(states, dummy_apply_fn, 1, conn, + make_lw_shared(), make_lw_shared(), false); + auto tickers = init_raft_tickers(rafts); + // Keep track of what servers are in the current configuration + std::unordered_set in_configuration; + for (size_t s = 0; s < rafts.size(); ++s) { + in_configuration.insert(s); + } + // Elect first node a leader + constexpr size_t initial_leader = 0; + rafts[initial_leader].server->wait_until_candidate(); + co_await rafts[initial_leader].server->wait_election_done(); + co_await wait_log(rafts, conn, in_configuration, initial_leader); + try { + // Execute the test + co_await test_case_body(rafts, conn, tickers, initial_leader, in_configuration); + } catch (...) { + BOOST_ERROR(format("RPC test failed unexpectedly with error: {}", std::current_exception())); + } + // Stop tickers + pause_tickers(tickers); + // Stop raft servers + for (auto& raft : rafts) { + co_await raft.server->abort(); + } +} + // 1 nodes, simple replication, empty, no updates RAFT_TEST_CASE(simple_replication, (test_case{ .nodes = 1})) @@ -1077,3 +1166,396 @@ RAFT_TEST_CASE(etcd_test_leader_cycle, (test_case{ new_leader{1},new_leader{0} }})); +/// +/// RPC-related tests +/// + +SEASTAR_TEST_CASE(rpc_load_conf_from_snapshot) { + // 1 node cluster with an initial configuration from a snapshot. + // Test that RPC configuration is set up correctly when the raft server + // instance is started. + constexpr size_t nodes = 1; + + raft::server_id sid = id(); + initial_state state; + state.snapshot.config = raft::configuration{sid}; + + auto raft = create_raft_server(sid, dummy_apply_fn, state, 1, + make_lw_shared(nodes), make_lw_shared(), + make_lw_shared(), false); + co_await raft.server->start(); + + BOOST_CHECK(raft.rpc->known_peers() == address_set({sid})); + + co_await raft.server->abort(); +} + +SEASTAR_TEST_CASE(rpc_load_conf_from_log) { + // 1 node cluster. + // Initial configuration is taken from the persisted log. + constexpr size_t nodes = 1; + + raft::server_id sid = id(); + initial_state state; + raft::log_entry conf_entry{.idx = raft::index_t{1}, .data = raft::configuration{sid}}; + state.log.emplace_back(std::move(conf_entry)); + + auto raft = create_raft_server(sid, dummy_apply_fn, state, 1, + make_lw_shared(nodes), make_lw_shared(), + make_lw_shared(), false); + co_await raft.server->start(); + + BOOST_CHECK(raft.rpc->known_peers() == address_set({sid})); + + co_await raft.server->abort(); +} + +SEASTAR_TEST_CASE(rpc_propose_conf_change) { + // 3 node cluster {A, B, C}. + // Shrinked later to 2 nodes and then expanded back to 3 nodes. + // Test that both configuration changes update RPC configuration correspondingly + // on all nodes. + return rpc_test(3, [] (std::vector& rafts, lw_shared_ptr connected, + std::vector& tickers, size_t leader, + std::unordered_set& in_configuration) -> future<> { + // Remove node C from the cluster configuration. + in_configuration = co_await rpc_test_change_configuration(rafts, connected, + in_configuration, set_config{0, 1}, leader, tickers); + + // Check that RPC config is updated both on leader and on follower nodes, + // i.e. `rpc::remove_server` is called. + auto reduced_config = address_set({to_raft_id(0), to_raft_id(1)}); + for (const auto& node : in_configuration) { + BOOST_CHECK(rafts[node].rpc->known_peers() == reduced_config); + } + + // Re-add node C to the cluster configuration. + in_configuration = co_await rpc_test_change_configuration(rafts, connected, + in_configuration, set_config{0, 1, 2}, leader, tickers); + + // Check that both A (leader) and B (follower) call `rpc::add_server`, + // also the newly integrated node gets the actual RPC configuration, too. + const auto initial_cluster_conf = full_cluster_address_set(rafts.size()); + co_await seastar::async([&] { + for (const auto& r : rafts) { + CHECK_EVENTUALLY_EQUAL(r.rpc->known_peers(), initial_cluster_conf); + } + }); + }); +} + +SEASTAR_TEST_CASE(rpc_leader_election) { + // 3 node cluster {A, B, C}. + // Test that leader elections don't change RPC configuration. + return rpc_test(3, [] (std::vector& rafts, lw_shared_ptr connected, + std::vector& tickers, size_t initial_leader, + std::unordered_set& in_configuration) -> future<> { + auto all_nodes = full_cluster_address_set(rafts.size()); + for (auto& raft : rafts) { + BOOST_CHECK(raft.rpc->known_peers() == all_nodes); + raft.rpc->reset_counters(); + } + + // Elect 2nd node a leader + constexpr size_t new_leader = 1; + pause_tickers(tickers); + co_await elect_new_leader(rafts, connected, in_configuration, initial_leader, new_leader); + restart_tickers(tickers); + + // Check that no attempts to update RPC were made. + for (const auto& r : rafts) { + BOOST_CHECK(!r.rpc->servers_added()); + BOOST_CHECK(!r.rpc->servers_removed()); + } + }); +} + +SEASTAR_TEST_CASE(rpc_voter_non_voter_transision) { + // 3 node cluster {A, B, C}. + // Test that demoting of node C to learner state and then promoting back + // to voter doesn't involve any RPC configuration changes. + return rpc_test(3, [] (std::vector& rafts, lw_shared_ptr connected, + std::vector& tickers, size_t leader, + std::unordered_set& in_configuration) -> future<> { + const auto all_voter_nodes = full_cluster_address_set(rafts.size()); + for (auto& raft : rafts) { + BOOST_CHECK(raft.rpc->known_peers() == all_voter_nodes); + raft.rpc->reset_counters(); + } + + // Make C a non-voting member. + in_configuration = co_await rpc_test_change_configuration(rafts, connected, + in_configuration, set_config{0, 1, set_config_entry(2, false)}, leader, tickers); + + // Check that RPC configuration didn't change. + for (const auto& raft : rafts) { + BOOST_CHECK(!raft.rpc->servers_added()); + BOOST_CHECK(!raft.rpc->servers_removed()); + } + + // Make C a voting member. + in_configuration = co_await rpc_test_change_configuration(rafts, connected, + in_configuration, set_config{0, 1, 2}, leader, tickers); + + // RPC configuration shouldn't change. + for (const auto& raft : rafts) { + BOOST_CHECK(!raft.rpc->servers_added()); + BOOST_CHECK(!raft.rpc->servers_removed()); + } + }); +} + +SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) { + // 3 node cluster {A, B, C}. + // Issue a configuration change on leader (A): add node D. + // Fail the node before the entry is committed (disconnect from the + // rest of the cluster and restart the node). + // + // In the meanwhile, elect a new leader within the connected part of the + // cluster (B). A becomes an isolated follower in this case. + // A should observe {A, B, C, D} RPC configuration: when in joint + // consensus, we need to account for servers in both configurations. + // + // Heal network partition and observe that A's log is truncated (actually, + // it's empty since B does not have any entries at all, except for dummies). + // The RPC configuration on A is restored from initial snapshot configuration, + // which is {A, B, C}. + return rpc_test(3, [] (std::vector& rafts, lw_shared_ptr connected, + std::vector& tickers, size_t initial_leader, + std::unordered_set& in_configuration) -> future<> { + const auto all_nodes = full_cluster_address_set(rafts.size()); + pause_tickers(tickers); + // Disconnect A from B and C. + connected->disconnect(to_raft_id(0)); + // Emulate a failed configuration change on A (add node D) by + // restarting A with a modified initial log containing one extraneous + // configuration entry. + co_await rafts[initial_leader].server->abort(); + // Restart A with a synthetic initial state representing + // the same initial snapshot config (A, B, C) as before, + // but with the following things in mind: + // * log contains only one entry: joint configuration entry + // that is equivalent to that of A's before the crash. + // * The configuration entry would have term=1 so that it'll + // be truncated when A gets in contact with other nodes + // * This will completely erase all entries on A leaving its + // log empty. + auto extended_conf = address_set({to_raft_id(0), to_raft_id(1), to_raft_id(2), to_raft_id(3)}); + initial_state restart_state{ + .log = { + raft::log_entry{raft::term_t(1), raft::index_t(1), + raft::configuration( + extended_conf, + all_nodes + ) + } + }, + .snapshot = {.config = all_nodes} + }; + rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1, + connected, make_lw_shared(), make_lw_shared(), false); + co_await rafts[initial_leader].server->start(); + tickers[initial_leader].set_callback([&rafts, s=initial_leader] { rafts[s].server->tick(); }); + restart_tickers(tickers); + + // A should see {A, B, C, D} as RPC config since + // the latest configuration entry points to joint + // configuration {.current = {A, B, C, D}, .previous = {A, B, C}}. + // RPC configuration is computed as a union of current + // and previous configurations. + BOOST_CHECK(rafts[0].rpc->known_peers() == extended_conf); + + // Elect B as leader + pause_tickers(tickers); + auto new_leader = co_await elect_new_leader(rafts, connected, in_configuration, + initial_leader, 1); + restart_tickers(tickers); + + // Heal network partition. + connected->connect_all(); + + // wait to synchronize logs between current leader (B) and the rest of the cluster + co_await wait_log(rafts, connected, in_configuration, new_leader); + // A should have truncated an offending configuration entry and revert its RPC configuration. + // + // Since B's log is effectively empty (does not contain any configuration + // entries), A's configuration view ({A, B, C}) is restored from + // initial snapshot. + co_await seastar::async([&] { + CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), all_nodes); + }); + }); +} + +SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) { + // 4 node cluster {A, B, C, D}. + // Change configuration to {A, B, C} from A and wait for it to become + // committed. + // + // Then, issue a configuration change on leader (A): remove node C. + // Fail the node before the entry is committed (disconnect from the + // rest of the cluster and restart the node). We emulate this behavior by + // just terminating the node and restarting it with a pre-defined state + // that is equivalent to having an uncommitted configuration entry in + // the log. + // + // In the meanwhile, elect a new leader within the connected part of the + // cluster (B). A becomes an isolated follower in this case. + // + // Heal network partition and observe that A's log is truncated and + // replaced with that of B. RPC configuration should not change between + // the crash + network partition and synchronization with B, since + // the effective RPC cfg would be {A, B, C} both for + // joint cfg = {.current = {A, B}, .previous = {A, B, C}} + // and the previously commited cfg = {A, B, C}. + // + // After that, test for the second case: switch leader back to A and + // try to expand the cluster back to initial state (re-add + // node D): {A, B, C, D}. + // + // Try to set configuration {A, B, C, D} on leader A, isolate and crash it. + // Restart with synthetic state containing an uncommitted configuration entry. + // + // This time before healing the network we should observe + // RPC configuration = {A, B, C, D}, accounting for an uncommitted part of the + // configuration. + // After healing the network and synchronizing with new leader B, RPC config + // should be reverted back to committed state {A, B, C}. + return rpc_test(4, [] (std::vector& rafts, lw_shared_ptr connected, + std::vector& tickers, size_t initial_leader, + std::unordered_set& in_configuration) -> future<> { + const auto all_nodes = full_cluster_address_set(rafts.size()); + + // Remove node D from the cluster configuration. + auto committed_conf = address_set({to_raft_id(0), to_raft_id(1), to_raft_id(2)}); + in_configuration = co_await rpc_test_change_configuration(rafts, connected, + in_configuration, set_config{0, 1, 2}, initial_leader, tickers); + // {A, B, C} configuration is committed by now. + + // + // First case: shrink cluster (remove node C). + // + + // Disconnect A from the rest of the cluster. + connected->disconnect(to_raft_id(0)); + // Try to change configuration (remove node C) + auto uncommitted_conf = address_set({to_raft_id(0), to_raft_id(1)}); + // `set_configuration` call will fail on A because + // it's cut off the other nodes and it will be waiting for them, + // but A is terminated before the network is allowed to heal the partition. + tickers[0].cancel(); + co_await rafts[initial_leader].server->abort(); + // Restart A with a synthetic initial state that contains two entries + // in the log: + // 1. {A, B, C} configuration committed before crash + partition. + // 2. uncommitted joint configuration entry that is equivalent + // to that of A's before the crash. + initial_state restart_state{ + .log = { + raft::log_entry{raft::term_t(1), raft::index_t(1), + raft::configuration(committed_conf) + }, + raft::log_entry{raft::term_t(2), raft::index_t(2), + raft::configuration( + uncommitted_conf, + committed_conf + ) + } + }, + .snapshot = {.config = all_nodes} + }; + rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1, + connected, make_lw_shared(), make_lw_shared(), false); + co_await rafts[initial_leader].server->start(); + tickers[initial_leader].set_callback([&rafts, s=initial_leader] { rafts[s].server->tick(); }); + tickers[0].rearm_periodic(tick_delta); + + // A's RPC configuration should stay the same because + // for both uncommitted joint cfg = {.current = {A, B}, .previous = {A, B, C}} + // and committed cfg = {A, B, C} the RPC cfg would be equal to {A, B, C} + BOOST_CHECK(rafts[0].rpc->known_peers() == committed_conf); + + // Elect B as leader + pause_tickers(tickers); + auto new_leader = co_await elect_new_leader(rafts, connected, in_configuration, + initial_leader, 1); + restart_tickers(tickers); + + // Heal network partition. + connected->connect_all(); + + // wait to synchronize logs between current leader (B) and the rest of the cluster + co_await wait_log(rafts, connected, in_configuration, new_leader); + + // Again, A's RPC configuration is the same as before despite the + // real cfg being reverted to the committed state as it is the union + // between current and previous configurations in case of + // joint cfg, anyway. + co_await seastar::async([&] { + CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), committed_conf); + }); + BOOST_CHECK(rafts[1].rpc->known_peers() == committed_conf); + BOOST_CHECK(rafts[2].rpc->known_peers() == committed_conf); + + // + // Second case: expand cluster (re-add node D). + // + + // Elect A leader again. + pause_tickers(tickers); + co_await elect_new_leader(rafts, connected, in_configuration, + new_leader, initial_leader); + restart_tickers(tickers); + + co_await wait_log(rafts, connected, in_configuration, initial_leader); + + // Disconnect A from the rest of the cluster. + connected->disconnect(to_raft_id(0)); + + // Try to add D back. + tickers[0].cancel(); + co_await rafts[initial_leader].server->abort(); + initial_state restart_state_2{ + .log = { + raft::log_entry{raft::term_t(1), raft::index_t(1), + raft::configuration(committed_conf) + }, + raft::log_entry{raft::term_t(2), raft::index_t(2), + raft::configuration( + all_nodes, + committed_conf + ) + } + }, + .snapshot = {.config = all_nodes} + }; + rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state_2, 1, + connected, make_lw_shared(), make_lw_shared(), false); + co_await rafts[initial_leader].server->start(); + tickers[initial_leader].set_callback([&rafts, s=initial_leader] { rafts[s].server->tick(); }); + tickers[0].rearm_periodic(tick_delta); + + // A should observe RPC configuration = {A, B, C, D} since it's the union + // of an uncommitted joint config components + // {.current = {A, B, C, D}, .previous = {A, B, C}}. + BOOST_CHECK(rafts[0].rpc->known_peers() == all_nodes); + + // Elect B as leader + pause_tickers(tickers); + new_leader = co_await elect_new_leader(rafts, connected, in_configuration, + initial_leader, 1); + restart_tickers(tickers); + + // Heal network partition. + connected->connect_all(); + + // wait to synchronize logs between current leader (B) and the rest of the cluster + co_await wait_log(rafts, connected, in_configuration, new_leader); + // A's RPC configuration is reverted to committed configuration {A, B, C}. + co_await seastar::async([&] { + CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), committed_conf); + }); + BOOST_CHECK(rafts[1].rpc->known_peers() == committed_conf); + BOOST_CHECK(rafts[2].rpc->known_peers() == committed_conf); + }); +} \ No newline at end of file