raft: add tests for RPC module

Now RPC module has some basic testing coverage to
make sure RPC configuration is updated appropriately
on configuration changes (i.e. `add_server` and
`remove_server` are called when appropriate).

The test suite currenty consists of the following
test-cases:
 * Loading server instance with configuration from a snapshot.
 * Loading server instance with configuration from a log.
 * Configuration changes (remove + add node).
 * Leader elections don't lead to RPC configuration changes.
 * Voter <-> learner node transitions also don't change RPC
   configuration.
 * Reverting uncommitted configuration changes updates
   RPC configuration accordingly (two cases: revert to
   snapshot config or committed state from the log).

Tests: unit(dev, debug)

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
Pavel Solodovnikov
2021-05-19 20:22:38 +03:00
parent e030e291a8
commit a66de8658b

View File

@@ -28,11 +28,13 @@
#include <seastar/util/later.hh>
#include <seastar/testing/random.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/testing/test_case.hh>
#include "raft/server.hh"
#include "serializer.hh"
#include "serializer_impl.hh"
#include "xx_hasher.hh"
#include "test/raft/helpers.hh"
#include "test/lib/eventually.hh"
// Test Raft library with declarative test definitions
//
@@ -301,6 +303,9 @@ class rpc : public raft::rpc {
lw_shared_ptr<connected> _connected;
lw_shared_ptr<snapshots> _snapshots;
bool _packet_drops;
raft::server_address_set _known_peers;
uint32_t _servers_added = 0;
uint32_t _servers_removed = 0;
public:
rpc(raft::server_id id, lw_shared_ptr<connected> connected, lw_shared_ptr<snapshots> snapshots,
bool packet_drops) : _id(id), _connected(connected), _snapshots(snapshots),
@@ -376,12 +381,32 @@ public:
net[id]->_client->timeout_now_request(_id, std::move(timeout_now));
return make_ready_future<>();
}
virtual void add_server(raft::server_id id, bytes node_info) {}
virtual void remove_server(raft::server_id id) {}
virtual void add_server(raft::server_id id, bytes node_info) {
_known_peers.insert(raft::server_address{id});
++_servers_added;
}
virtual void remove_server(raft::server_id id) {
_known_peers.erase(raft::server_address{id});
++_servers_removed;
}
virtual future<> abort() { return make_ready_future<>(); }
static void reset_network() {
net.clear();
}
const raft::server_address_set& known_peers() const {
return _known_peers;
}
void reset_counters() {
_servers_added = 0;
_servers_removed = 0;
}
uint32_t servers_added() const {
return _servers_added;
}
uint32_t servers_removed() const {
return _servers_removed;
}
};
std::unordered_map<raft::server_id, rpc*> rpc::net;
@@ -389,6 +414,7 @@ std::unordered_map<raft::server_id, rpc*> rpc::net;
struct test_server {
std::unique_ptr<raft::server> server;
state_machine* sm;
rpc* rpc;
};
test_server
@@ -399,6 +425,7 @@ create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, initial_
auto sm = std::make_unique<state_machine>(uuid, std::move(apply), apply_entries, snapshots);
auto& rsm = *sm;
auto mrpc = std::make_unique<rpc>(uuid, connected, snapshots, packet_drops);
auto& rpc_ref = *mrpc;
auto mpersistence = std::make_unique<persistence>(uuid, state, snapshots, persisted_snapshots);
auto fd = seastar::make_shared<failure_detector>(uuid, connected);
@@ -407,7 +434,8 @@ create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, initial_
return {
std::move(raft),
&rsm
&rsm,
&rpc_ref
};
}
@@ -870,6 +898,67 @@ void replication_test(struct test_case test, bool prevote, bool packet_drops) {
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote_drops) { \
replication_test(test_body, true, true); }
raft::server_address_set full_cluster_address_set(size_t nodes) {
raft::server_address_set res;
for (size_t i = 0; i < nodes; ++i) {
res.emplace(to_server_address(i));
}
return res;
}
using test_func = seastar::noncopyable_function<
future<>(std::vector<test_server>&, lw_shared_ptr<connected>, std::vector<raft_ticker_type>&,
size_t, std::unordered_set<size_t>&)>;
size_t dummy_apply_fn(raft::server_id id, const std::vector<raft::command_cref>& commands,
lw_shared_ptr<hasher_int> hasher) {
return 0;
}
future<std::unordered_set<size_t>> rpc_test_change_configuration(std::vector<test_server>& rafts,
lw_shared_ptr<connected> connected, std::unordered_set<size_t>& in_configuration,
set_config sc, size_t& leader,
std::vector<seastar::timer<lowres_clock>>& tickers) {
return change_configuration(rafts, 1, connected, in_configuration,
make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(),
false, sc, leader, tickers, dummy_apply_fn);
}
// Wrapper function for running RPC tests that provides a convenient
// automatic initialization and de-initialization of a raft cluster.
future<> rpc_test(size_t nodes, test_func test_case_body) {
std::vector<initial_state> states(nodes);
auto conn = make_lw_shared<connected>(nodes);
rpc::reset_network();
// Initialize and start the cluster with corresponding tickers
auto rafts = co_await create_cluster(states, dummy_apply_fn, 1, conn,
make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
auto tickers = init_raft_tickers(rafts);
// Keep track of what servers are in the current configuration
std::unordered_set<size_t> in_configuration;
for (size_t s = 0; s < rafts.size(); ++s) {
in_configuration.insert(s);
}
// Elect first node a leader
constexpr size_t initial_leader = 0;
rafts[initial_leader].server->wait_until_candidate();
co_await rafts[initial_leader].server->wait_election_done();
co_await wait_log(rafts, conn, in_configuration, initial_leader);
try {
// Execute the test
co_await test_case_body(rafts, conn, tickers, initial_leader, in_configuration);
} catch (...) {
BOOST_ERROR(format("RPC test failed unexpectedly with error: {}", std::current_exception()));
}
// Stop tickers
pause_tickers(tickers);
// Stop raft servers
for (auto& raft : rafts) {
co_await raft.server->abort();
}
}
// 1 nodes, simple replication, empty, no updates
RAFT_TEST_CASE(simple_replication, (test_case{
.nodes = 1}))
@@ -1077,3 +1166,396 @@ RAFT_TEST_CASE(etcd_test_leader_cycle, (test_case{
new_leader{1},new_leader{0}
}}));
///
/// RPC-related tests
///
SEASTAR_TEST_CASE(rpc_load_conf_from_snapshot) {
// 1 node cluster with an initial configuration from a snapshot.
// Test that RPC configuration is set up correctly when the raft server
// instance is started.
constexpr size_t nodes = 1;
raft::server_id sid = id();
initial_state state;
state.snapshot.config = raft::configuration{sid};
auto raft = create_raft_server(sid, dummy_apply_fn, state, 1,
make_lw_shared<connected>(nodes), make_lw_shared<snapshots>(),
make_lw_shared<persisted_snapshots>(), false);
co_await raft.server->start();
BOOST_CHECK(raft.rpc->known_peers() == address_set({sid}));
co_await raft.server->abort();
}
SEASTAR_TEST_CASE(rpc_load_conf_from_log) {
// 1 node cluster.
// Initial configuration is taken from the persisted log.
constexpr size_t nodes = 1;
raft::server_id sid = id();
initial_state state;
raft::log_entry conf_entry{.idx = raft::index_t{1}, .data = raft::configuration{sid}};
state.log.emplace_back(std::move(conf_entry));
auto raft = create_raft_server(sid, dummy_apply_fn, state, 1,
make_lw_shared<connected>(nodes), make_lw_shared<snapshots>(),
make_lw_shared<persisted_snapshots>(), false);
co_await raft.server->start();
BOOST_CHECK(raft.rpc->known_peers() == address_set({sid}));
co_await raft.server->abort();
}
SEASTAR_TEST_CASE(rpc_propose_conf_change) {
// 3 node cluster {A, B, C}.
// Shrinked later to 2 nodes and then expanded back to 3 nodes.
// Test that both configuration changes update RPC configuration correspondingly
// on all nodes.
return rpc_test(3, [] (std::vector<test_server>& rafts, lw_shared_ptr<connected> connected,
std::vector<raft_ticker_type>& tickers, size_t leader,
std::unordered_set<size_t>& in_configuration) -> future<> {
// Remove node C from the cluster configuration.
in_configuration = co_await rpc_test_change_configuration(rafts, connected,
in_configuration, set_config{0, 1}, leader, tickers);
// Check that RPC config is updated both on leader and on follower nodes,
// i.e. `rpc::remove_server` is called.
auto reduced_config = address_set({to_raft_id(0), to_raft_id(1)});
for (const auto& node : in_configuration) {
BOOST_CHECK(rafts[node].rpc->known_peers() == reduced_config);
}
// Re-add node C to the cluster configuration.
in_configuration = co_await rpc_test_change_configuration(rafts, connected,
in_configuration, set_config{0, 1, 2}, leader, tickers);
// Check that both A (leader) and B (follower) call `rpc::add_server`,
// also the newly integrated node gets the actual RPC configuration, too.
const auto initial_cluster_conf = full_cluster_address_set(rafts.size());
co_await seastar::async([&] {
for (const auto& r : rafts) {
CHECK_EVENTUALLY_EQUAL(r.rpc->known_peers(), initial_cluster_conf);
}
});
});
}
SEASTAR_TEST_CASE(rpc_leader_election) {
// 3 node cluster {A, B, C}.
// Test that leader elections don't change RPC configuration.
return rpc_test(3, [] (std::vector<test_server>& rafts, lw_shared_ptr<connected> connected,
std::vector<raft_ticker_type>& tickers, size_t initial_leader,
std::unordered_set<size_t>& in_configuration) -> future<> {
auto all_nodes = full_cluster_address_set(rafts.size());
for (auto& raft : rafts) {
BOOST_CHECK(raft.rpc->known_peers() == all_nodes);
raft.rpc->reset_counters();
}
// Elect 2nd node a leader
constexpr size_t new_leader = 1;
pause_tickers(tickers);
co_await elect_new_leader(rafts, connected, in_configuration, initial_leader, new_leader);
restart_tickers(tickers);
// Check that no attempts to update RPC were made.
for (const auto& r : rafts) {
BOOST_CHECK(!r.rpc->servers_added());
BOOST_CHECK(!r.rpc->servers_removed());
}
});
}
SEASTAR_TEST_CASE(rpc_voter_non_voter_transision) {
// 3 node cluster {A, B, C}.
// Test that demoting of node C to learner state and then promoting back
// to voter doesn't involve any RPC configuration changes.
return rpc_test(3, [] (std::vector<test_server>& rafts, lw_shared_ptr<connected> connected,
std::vector<raft_ticker_type>& tickers, size_t leader,
std::unordered_set<size_t>& in_configuration) -> future<> {
const auto all_voter_nodes = full_cluster_address_set(rafts.size());
for (auto& raft : rafts) {
BOOST_CHECK(raft.rpc->known_peers() == all_voter_nodes);
raft.rpc->reset_counters();
}
// Make C a non-voting member.
in_configuration = co_await rpc_test_change_configuration(rafts, connected,
in_configuration, set_config{0, 1, set_config_entry(2, false)}, leader, tickers);
// Check that RPC configuration didn't change.
for (const auto& raft : rafts) {
BOOST_CHECK(!raft.rpc->servers_added());
BOOST_CHECK(!raft.rpc->servers_removed());
}
// Make C a voting member.
in_configuration = co_await rpc_test_change_configuration(rafts, connected,
in_configuration, set_config{0, 1, 2}, leader, tickers);
// RPC configuration shouldn't change.
for (const auto& raft : rafts) {
BOOST_CHECK(!raft.rpc->servers_added());
BOOST_CHECK(!raft.rpc->servers_removed());
}
});
}
SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_snp) {
// 3 node cluster {A, B, C}.
// Issue a configuration change on leader (A): add node D.
// Fail the node before the entry is committed (disconnect from the
// rest of the cluster and restart the node).
//
// In the meanwhile, elect a new leader within the connected part of the
// cluster (B). A becomes an isolated follower in this case.
// A should observe {A, B, C, D} RPC configuration: when in joint
// consensus, we need to account for servers in both configurations.
//
// Heal network partition and observe that A's log is truncated (actually,
// it's empty since B does not have any entries at all, except for dummies).
// The RPC configuration on A is restored from initial snapshot configuration,
// which is {A, B, C}.
return rpc_test(3, [] (std::vector<test_server>& rafts, lw_shared_ptr<connected> connected,
std::vector<raft_ticker_type>& tickers, size_t initial_leader,
std::unordered_set<size_t>& in_configuration) -> future<> {
const auto all_nodes = full_cluster_address_set(rafts.size());
pause_tickers(tickers);
// Disconnect A from B and C.
connected->disconnect(to_raft_id(0));
// Emulate a failed configuration change on A (add node D) by
// restarting A with a modified initial log containing one extraneous
// configuration entry.
co_await rafts[initial_leader].server->abort();
// Restart A with a synthetic initial state representing
// the same initial snapshot config (A, B, C) as before,
// but with the following things in mind:
// * log contains only one entry: joint configuration entry
// that is equivalent to that of A's before the crash.
// * The configuration entry would have term=1 so that it'll
// be truncated when A gets in contact with other nodes
// * This will completely erase all entries on A leaving its
// log empty.
auto extended_conf = address_set({to_raft_id(0), to_raft_id(1), to_raft_id(2), to_raft_id(3)});
initial_state restart_state{
.log = {
raft::log_entry{raft::term_t(1), raft::index_t(1),
raft::configuration(
extended_conf,
all_nodes
)
}
},
.snapshot = {.config = all_nodes}
};
rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1,
connected, make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
co_await rafts[initial_leader].server->start();
tickers[initial_leader].set_callback([&rafts, s=initial_leader] { rafts[s].server->tick(); });
restart_tickers(tickers);
// A should see {A, B, C, D} as RPC config since
// the latest configuration entry points to joint
// configuration {.current = {A, B, C, D}, .previous = {A, B, C}}.
// RPC configuration is computed as a union of current
// and previous configurations.
BOOST_CHECK(rafts[0].rpc->known_peers() == extended_conf);
// Elect B as leader
pause_tickers(tickers);
auto new_leader = co_await elect_new_leader(rafts, connected, in_configuration,
initial_leader, 1);
restart_tickers(tickers);
// Heal network partition.
connected->connect_all();
// wait to synchronize logs between current leader (B) and the rest of the cluster
co_await wait_log(rafts, connected, in_configuration, new_leader);
// A should have truncated an offending configuration entry and revert its RPC configuration.
//
// Since B's log is effectively empty (does not contain any configuration
// entries), A's configuration view ({A, B, C}) is restored from
// initial snapshot.
co_await seastar::async([&] {
CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), all_nodes);
});
});
}
SEASTAR_TEST_CASE(rpc_configuration_truncate_restore_from_log) {
// 4 node cluster {A, B, C, D}.
// Change configuration to {A, B, C} from A and wait for it to become
// committed.
//
// Then, issue a configuration change on leader (A): remove node C.
// Fail the node before the entry is committed (disconnect from the
// rest of the cluster and restart the node). We emulate this behavior by
// just terminating the node and restarting it with a pre-defined state
// that is equivalent to having an uncommitted configuration entry in
// the log.
//
// In the meanwhile, elect a new leader within the connected part of the
// cluster (B). A becomes an isolated follower in this case.
//
// Heal network partition and observe that A's log is truncated and
// replaced with that of B. RPC configuration should not change between
// the crash + network partition and synchronization with B, since
// the effective RPC cfg would be {A, B, C} both for
// joint cfg = {.current = {A, B}, .previous = {A, B, C}}
// and the previously commited cfg = {A, B, C}.
//
// After that, test for the second case: switch leader back to A and
// try to expand the cluster back to initial state (re-add
// node D): {A, B, C, D}.
//
// Try to set configuration {A, B, C, D} on leader A, isolate and crash it.
// Restart with synthetic state containing an uncommitted configuration entry.
//
// This time before healing the network we should observe
// RPC configuration = {A, B, C, D}, accounting for an uncommitted part of the
// configuration.
// After healing the network and synchronizing with new leader B, RPC config
// should be reverted back to committed state {A, B, C}.
return rpc_test(4, [] (std::vector<test_server>& rafts, lw_shared_ptr<connected> connected,
std::vector<raft_ticker_type>& tickers, size_t initial_leader,
std::unordered_set<size_t>& in_configuration) -> future<> {
const auto all_nodes = full_cluster_address_set(rafts.size());
// Remove node D from the cluster configuration.
auto committed_conf = address_set({to_raft_id(0), to_raft_id(1), to_raft_id(2)});
in_configuration = co_await rpc_test_change_configuration(rafts, connected,
in_configuration, set_config{0, 1, 2}, initial_leader, tickers);
// {A, B, C} configuration is committed by now.
//
// First case: shrink cluster (remove node C).
//
// Disconnect A from the rest of the cluster.
connected->disconnect(to_raft_id(0));
// Try to change configuration (remove node C)
auto uncommitted_conf = address_set({to_raft_id(0), to_raft_id(1)});
// `set_configuration` call will fail on A because
// it's cut off the other nodes and it will be waiting for them,
// but A is terminated before the network is allowed to heal the partition.
tickers[0].cancel();
co_await rafts[initial_leader].server->abort();
// Restart A with a synthetic initial state that contains two entries
// in the log:
// 1. {A, B, C} configuration committed before crash + partition.
// 2. uncommitted joint configuration entry that is equivalent
// to that of A's before the crash.
initial_state restart_state{
.log = {
raft::log_entry{raft::term_t(1), raft::index_t(1),
raft::configuration(committed_conf)
},
raft::log_entry{raft::term_t(2), raft::index_t(2),
raft::configuration(
uncommitted_conf,
committed_conf
)
}
},
.snapshot = {.config = all_nodes}
};
rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state, 1,
connected, make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
co_await rafts[initial_leader].server->start();
tickers[initial_leader].set_callback([&rafts, s=initial_leader] { rafts[s].server->tick(); });
tickers[0].rearm_periodic(tick_delta);
// A's RPC configuration should stay the same because
// for both uncommitted joint cfg = {.current = {A, B}, .previous = {A, B, C}}
// and committed cfg = {A, B, C} the RPC cfg would be equal to {A, B, C}
BOOST_CHECK(rafts[0].rpc->known_peers() == committed_conf);
// Elect B as leader
pause_tickers(tickers);
auto new_leader = co_await elect_new_leader(rafts, connected, in_configuration,
initial_leader, 1);
restart_tickers(tickers);
// Heal network partition.
connected->connect_all();
// wait to synchronize logs between current leader (B) and the rest of the cluster
co_await wait_log(rafts, connected, in_configuration, new_leader);
// Again, A's RPC configuration is the same as before despite the
// real cfg being reverted to the committed state as it is the union
// between current and previous configurations in case of
// joint cfg, anyway.
co_await seastar::async([&] {
CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), committed_conf);
});
BOOST_CHECK(rafts[1].rpc->known_peers() == committed_conf);
BOOST_CHECK(rafts[2].rpc->known_peers() == committed_conf);
//
// Second case: expand cluster (re-add node D).
//
// Elect A leader again.
pause_tickers(tickers);
co_await elect_new_leader(rafts, connected, in_configuration,
new_leader, initial_leader);
restart_tickers(tickers);
co_await wait_log(rafts, connected, in_configuration, initial_leader);
// Disconnect A from the rest of the cluster.
connected->disconnect(to_raft_id(0));
// Try to add D back.
tickers[0].cancel();
co_await rafts[initial_leader].server->abort();
initial_state restart_state_2{
.log = {
raft::log_entry{raft::term_t(1), raft::index_t(1),
raft::configuration(committed_conf)
},
raft::log_entry{raft::term_t(2), raft::index_t(2),
raft::configuration(
all_nodes,
committed_conf
)
}
},
.snapshot = {.config = all_nodes}
};
rafts[initial_leader] = create_raft_server(to_raft_id(initial_leader), dummy_apply_fn, restart_state_2, 1,
connected, make_lw_shared<snapshots>(), make_lw_shared<persisted_snapshots>(), false);
co_await rafts[initial_leader].server->start();
tickers[initial_leader].set_callback([&rafts, s=initial_leader] { rafts[s].server->tick(); });
tickers[0].rearm_periodic(tick_delta);
// A should observe RPC configuration = {A, B, C, D} since it's the union
// of an uncommitted joint config components
// {.current = {A, B, C, D}, .previous = {A, B, C}}.
BOOST_CHECK(rafts[0].rpc->known_peers() == all_nodes);
// Elect B as leader
pause_tickers(tickers);
new_leader = co_await elect_new_leader(rafts, connected, in_configuration,
initial_leader, 1);
restart_tickers(tickers);
// Heal network partition.
connected->connect_all();
// wait to synchronize logs between current leader (B) and the rest of the cluster
co_await wait_log(rafts, connected, in_configuration, new_leader);
// A's RPC configuration is reverted to committed configuration {A, B, C}.
co_await seastar::async([&] {
CHECK_EVENTUALLY_EQUAL(rafts[0].rpc->known_peers(), committed_conf);
});
BOOST_CHECK(rafts[1].rpc->known_peers() == committed_conf);
BOOST_CHECK(rafts[2].rpc->known_peers() == committed_conf);
});
}