mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-02 06:05:53 +00:00
Merge "raft: initial implementation" from Gleb
This is the beginning of raft protocol implementation. It only supports log replication and voter state machine. The main difference between this one and the RFC (besides having voter state machine) is that the approach taken here is to implement raft as a deterministic state machine and move all the IO processing away from the main logic. To do that some changes to RPC interface was required: all verbs are now one way meaning that sending a request does not wait for a reply and the reply arrives as a separate message (or not at all, it is safe to drop packets). * scylla-dev/raft-v4: raft: add a short readme file raft: compile raft tests raft: add raft tests raft: Implement log replication and leader election raft: Introduce raft interface header
This commit is contained in:
265
test/boost/raft_fsm_test.cc
Normal file
265
test/boost/raft_fsm_test.cc
Normal file
@@ -0,0 +1,265 @@
|
||||
/*
|
||||
* Copyright (c) 2020, Arm Limited and affiliates. All rights reserved.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_MODULE raft
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
#include "raft/fsm.hh"
|
||||
|
||||
using raft::term_t, raft::index_t, raft::server_id;
|
||||
|
||||
void election_threshold(raft::fsm& fsm) {
|
||||
for (int i = 0; i <= raft::ELECTION_TIMEOUT.count(); i++) {
|
||||
fsm.tick();
|
||||
}
|
||||
}
|
||||
|
||||
void election_timeout(raft::fsm& fsm) {
|
||||
for (int i = 0; i <= 2 * raft::ELECTION_TIMEOUT.count(); i++) {
|
||||
fsm.tick();
|
||||
}
|
||||
}
|
||||
|
||||
struct failure_detector: public raft::failure_detector {
|
||||
bool alive = true;
|
||||
bool is_alive(raft::server_id from) override {
|
||||
return alive;
|
||||
}
|
||||
};
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_election_single_node) {
|
||||
|
||||
failure_detector fd;
|
||||
server_id id1{utils::make_random_uuid()};
|
||||
raft::configuration cfg({id1});
|
||||
raft::log log{raft::snapshot{.config = cfg}};
|
||||
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd);
|
||||
|
||||
BOOST_CHECK(fsm.is_follower());
|
||||
|
||||
election_timeout(fsm);
|
||||
|
||||
// Immediately converts from leader to follower if quorum=1
|
||||
BOOST_CHECK(fsm.is_leader());
|
||||
|
||||
auto output = fsm.get_output();
|
||||
|
||||
BOOST_CHECK(output.term);
|
||||
BOOST_CHECK(output.vote);
|
||||
BOOST_CHECK(output.messages.empty());
|
||||
BOOST_CHECK(output.log_entries.empty());
|
||||
BOOST_CHECK(output.committed.empty());
|
||||
// The leader does not become candidate simply because
|
||||
// a timeout has elapsed, i.e. there are no spurious
|
||||
// elections.
|
||||
election_timeout(fsm);
|
||||
BOOST_CHECK(fsm.is_leader());
|
||||
output = fsm.get_output();
|
||||
BOOST_CHECK(!output.term);
|
||||
BOOST_CHECK(!output.vote);
|
||||
BOOST_CHECK(output.messages.empty());
|
||||
BOOST_CHECK(output.log_entries.empty());
|
||||
BOOST_CHECK(output.committed.empty());
|
||||
}
|
||||
|
||||
// Test that adding an entry to a single-node cluster
|
||||
// does not lead to RPC
|
||||
BOOST_AUTO_TEST_CASE(test_single_node_is_quiet) {
|
||||
|
||||
failure_detector fd;
|
||||
server_id id1{utils::make_random_uuid()};
|
||||
raft::configuration cfg({id1});
|
||||
raft::log log{raft::snapshot{.config = cfg}};
|
||||
|
||||
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd);
|
||||
|
||||
election_timeout(fsm);
|
||||
|
||||
// Immediately converts from leader to follower if quorum=1
|
||||
BOOST_CHECK(fsm.is_leader());
|
||||
|
||||
(void) fsm.get_output();
|
||||
|
||||
fsm.add_entry(raft::command{});
|
||||
|
||||
BOOST_CHECK(fsm.get_output().messages.empty());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_election_two_nodes) {
|
||||
|
||||
failure_detector fd;
|
||||
|
||||
server_id id1{utils::make_random_uuid()}, id2{utils::make_random_uuid()};
|
||||
|
||||
raft::configuration cfg({id1, id2});
|
||||
raft::log log{raft::snapshot{.config = cfg}};
|
||||
|
||||
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd);
|
||||
|
||||
// Initial state is follower
|
||||
BOOST_CHECK(fsm.is_follower());
|
||||
|
||||
// After election timeout, a follower becomes a candidate
|
||||
election_timeout(fsm);
|
||||
BOOST_CHECK(fsm.is_candidate());
|
||||
|
||||
// If nothing happens, the candidate stays this way
|
||||
election_timeout(fsm);
|
||||
BOOST_CHECK(fsm.is_candidate());
|
||||
|
||||
auto output = fsm.get_output();
|
||||
// After a favourable reply, we become a leader (quorum is 2)
|
||||
fsm.step(id2, raft::vote_reply{output.term, true});
|
||||
BOOST_CHECK(fsm.is_leader());
|
||||
// Out of order response to the previous election is ignored
|
||||
fsm.step(id2, raft::vote_reply{output.term - term_t{1}, false});
|
||||
assert(fsm.is_leader());
|
||||
|
||||
// Vote request within the election timeout is ignored
|
||||
// (avoiding disruptive leaders).
|
||||
fsm.step(id2, raft::vote_request{output.term + term_t{1}});
|
||||
BOOST_CHECK(fsm.is_leader());
|
||||
// Any message with a newer term after election timeout
|
||||
// -> immediately convert to follower
|
||||
fd.alive = false;
|
||||
election_threshold(fsm);
|
||||
fsm.step(id2, raft::vote_request{output.term + term_t{1}});
|
||||
BOOST_CHECK(fsm.is_follower());
|
||||
|
||||
// Check that the candidate converts to a follower as well
|
||||
election_timeout(fsm);
|
||||
BOOST_CHECK(fsm.is_candidate());
|
||||
output = fsm.get_output();
|
||||
fsm.step(id2, raft::vote_request{output.term + term_t{1}});
|
||||
BOOST_CHECK(fsm.is_follower());
|
||||
|
||||
// Test that a node doesn't cast a vote if it has voted for
|
||||
// self already
|
||||
(void) fsm.get_output();
|
||||
while (fsm.is_follower()) {
|
||||
fsm.tick();
|
||||
}
|
||||
BOOST_CHECK(fsm.is_candidate());
|
||||
output = fsm.get_output();
|
||||
auto msg = std::get<raft::vote_request>(output.messages.back().second);
|
||||
fsm.step(id2, std::move(msg));
|
||||
// We could figure out this round is going to a nowhere, but
|
||||
// we're not that smart and simply wait for a vote_reply.
|
||||
BOOST_CHECK(fsm.is_candidate());
|
||||
output = fsm.get_output();
|
||||
auto reply = std::get<raft::vote_reply>(output.messages.back().second);
|
||||
BOOST_CHECK(!reply.vote_granted);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_election_four_nodes) {
|
||||
|
||||
failure_detector fd;
|
||||
|
||||
server_id id1{utils::make_random_uuid()},
|
||||
id2{utils::make_random_uuid()},
|
||||
id3{utils::make_random_uuid()},
|
||||
id4{utils::make_random_uuid()};
|
||||
|
||||
raft::configuration cfg({id1, id2, id3, id4});
|
||||
raft::log log{raft::snapshot{.config = cfg}};
|
||||
|
||||
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd);
|
||||
|
||||
// Initial state is follower
|
||||
BOOST_CHECK(fsm.is_follower());
|
||||
|
||||
// Inform FSM about a new leader at a new term
|
||||
fsm.step(id4, raft::append_request_recv{term_t{1}, id4, index_t{1}, term_t{1}});
|
||||
|
||||
(void) fsm.get_output();
|
||||
|
||||
// Request a vote during the same term. Even though
|
||||
// we haven't voted, we should deny a vote because we
|
||||
// know about a leader for this term.
|
||||
fsm.step(id3, raft::vote_request{term_t{1}, index_t{1}, term_t{1}});
|
||||
|
||||
auto output = fsm.get_output();
|
||||
auto reply = std::get<raft::vote_reply>(output.messages.back().second);
|
||||
BOOST_CHECK(!reply.vote_granted);
|
||||
|
||||
// Run out of steam for this term. Start a new one.
|
||||
fd.alive = false;
|
||||
election_timeout(fsm);
|
||||
BOOST_CHECK(fsm.is_candidate());
|
||||
|
||||
output = fsm.get_output();
|
||||
// Add a favourable reply, not enough for quorum
|
||||
fsm.step(id2, raft::vote_reply{output.term, true});
|
||||
BOOST_CHECK(fsm.is_candidate());
|
||||
|
||||
// Add another one, this adds up to quorum
|
||||
fsm.step(id3, raft::vote_reply{output.term, true});
|
||||
BOOST_CHECK(fsm.is_leader());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_log_matching_rule) {
|
||||
|
||||
failure_detector fd;
|
||||
|
||||
server_id id1{utils::make_random_uuid()},
|
||||
id2{utils::make_random_uuid()},
|
||||
id3{utils::make_random_uuid()};
|
||||
|
||||
raft::configuration cfg({id1, id2, id3});
|
||||
raft::log log(raft::snapshot{.idx = index_t{999}, .config = cfg});
|
||||
|
||||
log.emplace_back(raft::log_entry{term_t{10}, index_t{1000}});
|
||||
log.stable_to(log.last_idx());
|
||||
|
||||
raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), fd);
|
||||
|
||||
// Initial state is follower
|
||||
BOOST_CHECK(fsm.is_follower());
|
||||
|
||||
(void) fsm.get_output();
|
||||
|
||||
fsm.step(id2, raft::vote_request{term_t{9}, index_t{1001}, term_t{11}});
|
||||
// Current term is too old - vote is not granted
|
||||
auto output = fsm.get_output();
|
||||
BOOST_CHECK(output.messages.empty());
|
||||
|
||||
auto request_vote = [&](term_t term, index_t last_log_idx, term_t last_log_term) -> raft::vote_reply {
|
||||
fsm.step(id2, raft::vote_request{term, last_log_idx, last_log_term});
|
||||
auto output = fsm.get_output();
|
||||
return std::get<raft::vote_reply>(output.messages.back().second);
|
||||
};
|
||||
|
||||
// Last stable index is too small - vote is not granted
|
||||
BOOST_CHECK(!request_vote(term_t{11}, index_t{999}, term_t{10}).vote_granted);
|
||||
// Last stable term is too small - vote is not granted
|
||||
BOOST_CHECK(!request_vote(term_t{12}, index_t{1002}, term_t{9}).vote_granted);
|
||||
// Last stable term and index are equal to the voter's - vote
|
||||
// is granted
|
||||
BOOST_CHECK(request_vote(term_t{13}, index_t{1000}, term_t{10}).vote_granted);
|
||||
// Last stable term is the same, index is greater to the voter's - vote
|
||||
// is granted
|
||||
BOOST_CHECK(request_vote(term_t{14}, index_t{1001}, term_t{10}).vote_granted);
|
||||
// Both term and index are greater than the voter's - vote
|
||||
// is granted
|
||||
BOOST_CHECK(request_vote(term_t{15}, index_t{1001}, term_t{11}).vote_granted);
|
||||
}
|
||||
378
test/raft/replication_test.cc
Normal file
378
test/raft/replication_test.cc
Normal file
@@ -0,0 +1,378 @@
|
||||
#include <random>
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/app-template.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/loop.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
#include "raft/server.hh"
|
||||
#include "serializer.hh"
|
||||
#include "serializer_impl.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static seastar::logger tlogger("test");
|
||||
|
||||
std::mt19937 random_generator() {
|
||||
std::random_device rd;
|
||||
// In case of errors, replace the seed with a fixed value to get a deterministic run.
|
||||
auto seed = rd();
|
||||
std::cout << "Random seed: " << seed << "\n";
|
||||
return std::mt19937(seed);
|
||||
}
|
||||
|
||||
int rand() {
|
||||
static thread_local std::uniform_int_distribution<int> dist(0, std::numeric_limits<uint8_t>::max());
|
||||
static thread_local auto gen = random_generator();
|
||||
|
||||
return dist(gen);
|
||||
}
|
||||
|
||||
bool drop_replication = false;
|
||||
|
||||
class state_machine : public raft::state_machine {
|
||||
public:
|
||||
using apply_fn = std::function<future<>(raft::server_id id, promise<>&, const std::vector<raft::command_cref>& commands)>;
|
||||
private:
|
||||
raft::server_id _id;
|
||||
apply_fn _apply;
|
||||
promise<> _done;
|
||||
public:
|
||||
state_machine(raft::server_id id, apply_fn apply) : _id(id), _apply(std::move(apply)) {}
|
||||
virtual future<> apply(const std::vector<raft::command_cref> commands) {
|
||||
return _apply(_id, _done, commands);
|
||||
}
|
||||
virtual future<raft::snapshot_id> take_snaphot() { return make_ready_future<raft::snapshot_id>(raft::snapshot_id()); }
|
||||
virtual void drop_snapshot(raft::snapshot_id id) {}
|
||||
virtual future<> load_snapshot(raft::snapshot_id id) { return make_ready_future<>(); };
|
||||
virtual future<> abort() { return make_ready_future<>(); }
|
||||
|
||||
future<> done() {
|
||||
return _done.get_future();
|
||||
}
|
||||
};
|
||||
|
||||
struct initial_state {
|
||||
raft::term_t term = raft::term_t(1);
|
||||
raft::server_id vote;
|
||||
std::vector<raft::log_entry> log;
|
||||
raft::snapshot snapshot;
|
||||
};
|
||||
|
||||
|
||||
class storage : public raft::storage {
|
||||
initial_state _conf;
|
||||
public:
|
||||
storage(initial_state conf) : _conf(std::move(conf)) {}
|
||||
storage() {}
|
||||
virtual future<> store_term_and_vote(raft::term_t term, raft::server_id vote) { co_return seastar::sleep(1us); }
|
||||
virtual future<std::pair<raft::term_t, raft::server_id>> load_term_and_vote() {
|
||||
auto term_and_vote = std::make_pair(_conf.term, _conf.vote);
|
||||
return make_ready_future<std::pair<raft::term_t, raft::server_id>>(term_and_vote);
|
||||
}
|
||||
virtual future<> store_snapshot(const raft::snapshot& snap, size_t preserve_log_entries) { return make_ready_future<>(); }
|
||||
virtual future<raft::snapshot> load_snapshot() {
|
||||
return make_ready_future<raft::snapshot>(_conf.snapshot);
|
||||
}
|
||||
virtual future<> store_log_entries(const std::vector<raft::log_entry_ptr>& entries) { co_return seastar::sleep(1us); };
|
||||
virtual future<raft::log_entries> load_log() {
|
||||
raft::log_entries log;
|
||||
for (auto&& e : _conf.log) {
|
||||
log.emplace_back(make_lw_shared(std::move(e)));
|
||||
}
|
||||
return make_ready_future<raft::log_entries>(std::move(log));
|
||||
}
|
||||
virtual future<> truncate_log(raft::index_t idx) { return make_ready_future<>(); }
|
||||
virtual future<> abort() { return make_ready_future<>(); }
|
||||
};
|
||||
|
||||
class rpc : public raft::rpc {
|
||||
static std::unordered_map<raft::server_id, rpc*> net;
|
||||
raft::server_id _id;
|
||||
public:
|
||||
rpc(raft::server_id id) : _id(id) {
|
||||
net[_id] = this;
|
||||
}
|
||||
virtual future<> send_snapshot(raft::server_id id, const raft::install_snapshot& snap) { return make_ready_future<>(); }
|
||||
virtual future<> send_append_entries(raft::server_id id, const raft::append_request_send& append_request) {
|
||||
if (drop_replication && !(rand() % 5)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
raft::append_request_recv req;
|
||||
req.current_term = append_request.current_term;
|
||||
req.leader_id = append_request.leader_id;
|
||||
req.prev_log_idx = append_request.prev_log_idx;
|
||||
req.prev_log_term = append_request.prev_log_term;
|
||||
req.leader_commit_idx = append_request.leader_commit_idx;
|
||||
for (auto&& e: append_request.entries) {
|
||||
req.entries.push_back(e);
|
||||
}
|
||||
net[id]->_client->append_entries(_id, std::move(req));
|
||||
//co_return seastar::sleep(1us);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
|
||||
if (drop_replication && !(rand() % 5)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
net[id]->_client->append_entries_reply(_id, std::move(reply));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> send_vote_request(raft::server_id id, const raft::vote_request& vote_request) {
|
||||
net[id]->_client->request_vote(_id, std::move(vote_request));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) {
|
||||
net[id]->_client->request_vote_reply(_id, std::move(vote_reply));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual void add_server(raft::server_id id, bytes node_info) {}
|
||||
virtual void remove_server(raft::server_id id) {}
|
||||
virtual future<> abort() { return make_ready_future<>(); }
|
||||
};
|
||||
|
||||
class failure_detector : public raft::failure_detector {
|
||||
bool is_alive(raft::server_id server) override {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_map<raft::server_id, rpc*> rpc::net;
|
||||
|
||||
std::pair<std::unique_ptr<raft::server>, state_machine*>
|
||||
create_raft_server(raft::server_id uuid, state_machine::apply_fn apply,
|
||||
initial_state state) {
|
||||
|
||||
auto sm = std::make_unique<state_machine>(uuid, std::move(apply));
|
||||
auto& rsm = *sm;
|
||||
auto mrpc = std::make_unique<rpc>(uuid);
|
||||
auto mstorage = std::make_unique<storage>(state);
|
||||
auto fd = seastar::make_shared<failure_detector>();
|
||||
auto raft = raft::create_server(uuid, std::move(mrpc), std::move(sm), std::move(mstorage),
|
||||
std::move(fd));
|
||||
|
||||
return std::make_pair(std::move(raft), &rsm);
|
||||
}
|
||||
|
||||
future<std::vector<std::pair<std::unique_ptr<raft::server>, state_machine*>>> create_cluster(std::vector<initial_state> states, state_machine::apply_fn apply) {
|
||||
raft::configuration config;
|
||||
std::vector<std::pair<std::unique_ptr<raft::server>, state_machine*>> rafts;
|
||||
|
||||
for (size_t i = 0; i < states.size(); i++) {
|
||||
auto uuid = utils::make_random_uuid();
|
||||
config.servers.push_back(raft::server_address{uuid});
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < states.size(); i++) {
|
||||
auto& s = config.servers[i];
|
||||
states[i].snapshot.config = config;
|
||||
auto& raft = *rafts.emplace_back(create_raft_server(s.id, apply, states[i])).first;
|
||||
co_await raft.start();
|
||||
}
|
||||
|
||||
co_return std::move(rafts);
|
||||
}
|
||||
|
||||
struct log_entry {
|
||||
unsigned term;
|
||||
int value;
|
||||
};
|
||||
|
||||
std::vector<raft::log_entry> create_log(std::initializer_list<log_entry> list, unsigned start_idx = 1) {
|
||||
std::vector<raft::log_entry> log;
|
||||
|
||||
unsigned i = start_idx;
|
||||
for (auto e : list) {
|
||||
raft::command command;
|
||||
ser::serialize(command, e.value);
|
||||
log.push_back(raft::log_entry{raft::term_t(e.term), raft::index_t(i++), std::move(command)});
|
||||
}
|
||||
|
||||
return log;
|
||||
}
|
||||
|
||||
constexpr int itr = 100;
|
||||
std::unordered_map<raft::server_id, int> sums;
|
||||
|
||||
future<> apply(raft::server_id id, promise<>& done, const std::vector<raft::command_cref>& commands) {
|
||||
tlogger.debug("sm::apply got {} entries", commands.size());
|
||||
for (auto&& d : commands) {
|
||||
auto is = ser::as_input_stream(d);
|
||||
int n = ser::deserialize(is, boost::type<int>());
|
||||
tlogger.debug("{}: apply {}", id, n);
|
||||
auto it = sums.find(id);
|
||||
if (it == sums.end()) {
|
||||
sums[id] = 0;
|
||||
}
|
||||
sums[id] += n;
|
||||
}
|
||||
if (sums[id] == ((itr - 1) * itr)/2) {
|
||||
done.set_value();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
};
|
||||
|
||||
|
||||
future<> test_helper(std::vector<initial_state> states, int start_itr = 0) {
|
||||
auto rafts = co_await create_cluster(states, apply);
|
||||
|
||||
auto& leader = *rafts[0].first;
|
||||
leader.make_me_leader();
|
||||
|
||||
co_await seastar::parallel_for_each(std::views::iota(start_itr, itr), [&] (int i) {
|
||||
tlogger.debug("Adding entry {} on a leader", i);
|
||||
raft::command command;
|
||||
ser::serialize(command, i);
|
||||
return leader.add_entry(std::move(command), raft::wait_type::committed);
|
||||
});
|
||||
|
||||
for (auto& r: rafts) {
|
||||
co_await r.second->done();
|
||||
}
|
||||
|
||||
for (auto& r: rafts) {
|
||||
co_await r.first->abort();
|
||||
}
|
||||
|
||||
sums.clear();
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> test_simple_replication(size_t size) {
|
||||
return test_helper(std::vector<initial_state>(size));
|
||||
}
|
||||
|
||||
// initially a leader has non empty log
|
||||
future<> test_replicate_non_empty_leader_log() {
|
||||
// 2 nodes, leader has entries in his log
|
||||
std::vector<initial_state> states(2);
|
||||
states[0].term = raft::term_t(1);
|
||||
states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}});
|
||||
|
||||
// start iterations from 4 since o4 entry is already in the log
|
||||
return test_helper(std::move(states), 4);
|
||||
}
|
||||
|
||||
// test special case where prev_index = 0 because the leader's log is empty
|
||||
future<> test_replace_log_leaders_log_empty() {
|
||||
// current leaders term is 2 and empty log
|
||||
// one of the follower have three entries that should be replaced
|
||||
std::vector<initial_state> states(3);
|
||||
states[0].term = raft::term_t(2);
|
||||
states[2].log = create_log({{1, 10}, {1, 20}, {1, 30}});
|
||||
|
||||
return test_helper(std::move(states));
|
||||
}
|
||||
|
||||
// two nodes, leader has one entry, follower has 3, existing entries do not match
|
||||
future<> test_replace_log_leaders_log_not_empty() {
|
||||
// current leaders term is 2 and the log has one entry
|
||||
// one of the follower have three entries that should be replaced
|
||||
std::vector<initial_state> states(2);
|
||||
states[0].term = raft::term_t(3);
|
||||
states[0].log = create_log({{1, 0}});
|
||||
states[1].log = create_log({{2, 10}, {2, 20}, {2, 30}});
|
||||
|
||||
// start iterations from 1 since one entry is already in the log
|
||||
return test_helper(std::move(states), 1);
|
||||
}
|
||||
|
||||
// two nodes, leader has 2 entries, follower has 4, index=1 matches index=2 does not
|
||||
future<> test_replace_log_leaders_log_not_empty_2() {
|
||||
// current leader's term is 2 and the log has one entry
|
||||
// one of the follower have three entries that should be replaced
|
||||
std::vector<initial_state> states(2);
|
||||
states[0].term = raft::term_t(3);
|
||||
states[0].log = create_log({{1, 0}, {1, 1}});
|
||||
states[1].log = create_log({{1, 0}, {2, 20}, {2, 30}, {2, 40}});
|
||||
|
||||
// start iterations from 2 since 2 entries are already in the log
|
||||
return test_helper(std::move(states), 2);
|
||||
}
|
||||
|
||||
// a follower and a leader have matching logs but leader's is shorter
|
||||
future<> test_replace_log_leaders_log_not_empty_3() {
|
||||
// current leaders term is 2 and the log has one entry
|
||||
// one of the follower have three entries that should be replaced
|
||||
std::vector<initial_state> states(2);
|
||||
states[0].term = raft::term_t(2);
|
||||
states[0].log = create_log({{1, 0}, {1, 1}});
|
||||
states[1].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}});
|
||||
|
||||
// start iterations from 2 since 2 entries are already in the log
|
||||
return test_helper(std::move(states), 2);
|
||||
}
|
||||
|
||||
// a follower and a leader have no common entries
|
||||
future<> test_replace_no_common_entries() {
|
||||
// current leaders term is 2 and the log has one entry
|
||||
// one of the follower have three entries that should be replaced
|
||||
std::vector<initial_state> states(2);
|
||||
states[0].term = raft::term_t(3);
|
||||
states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}, {1, 6}});
|
||||
states[1].log = create_log({{2, 10}, {2, 11}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}});
|
||||
|
||||
// start iterations from 7 since 7 entries are already in the log
|
||||
return test_helper(std::move(states), 7);
|
||||
}
|
||||
|
||||
// a follower and a leader have one common entry
|
||||
future<> test_replace_one_common_entry() {
|
||||
// current leaders term is 2 and the log has one entry
|
||||
// one of the follower have three entries that should be replaced
|
||||
std::vector<initial_state> states(2);
|
||||
states[0].term = raft::term_t(4);
|
||||
states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}, {3, 6}});
|
||||
states[1].log = create_log({{1, 0}, {2, 11}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}});
|
||||
|
||||
// start iterations from 7 since 7 entries are already in the log
|
||||
return test_helper(std::move(states), 7);
|
||||
}
|
||||
|
||||
// a follower and a leader have t1i common entry in different terms
|
||||
future<> test_replace_two_common_entry_different_terms() {
|
||||
// current leaders term is 2 and the log has one entry
|
||||
// one of the follower have three entries that should be replaced
|
||||
std::vector<initial_state> states(2);
|
||||
states[0].term = raft::term_t(5);
|
||||
states[0].log = create_log({{1, 0}, {2, 1}, {3, 2}, {3, 3}, {3, 4}, {3, 5}, {4, 6}});
|
||||
states[1].log = create_log({{1, 0}, {2, 1}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}});
|
||||
|
||||
// start iterations from 7 since 7 entries are already in the log
|
||||
return test_helper(std::move(states), 7);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
seastar::app_template::config cfg;
|
||||
seastar::app_template app(cfg);
|
||||
app.add_options()
|
||||
("drop-replication", bpo::value<bool>()->default_value(false), "drop replication packets randomly");
|
||||
|
||||
using test_fn = std::function<future<>()>;
|
||||
|
||||
test_fn tests[] = {
|
||||
std::bind(test_simple_replication, 1),
|
||||
std::bind(test_simple_replication, 2),
|
||||
test_replicate_non_empty_leader_log,
|
||||
test_replace_log_leaders_log_empty,
|
||||
test_replace_log_leaders_log_not_empty,
|
||||
test_replace_log_leaders_log_not_empty_2,
|
||||
test_replace_log_leaders_log_not_empty_3,
|
||||
test_replace_no_common_entries,
|
||||
test_replace_one_common_entry,
|
||||
test_replace_two_common_entry_different_terms,
|
||||
};
|
||||
|
||||
return app.run(argc, argv, [&tests, &app] () -> future<> {
|
||||
drop_replication = app.configuration()["drop-replication"].as<bool>();
|
||||
|
||||
int i = 0;
|
||||
for (auto& t : tests) {
|
||||
tlogger.debug("test: {}", i++);
|
||||
co_await t();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
5
test/raft/suite.yaml
Normal file
5
test/raft/suite.yaml
Normal file
@@ -0,0 +1,5 @@
|
||||
# Suite test type. Supported types: unit, boost, cql
|
||||
type: unit
|
||||
custom_args:
|
||||
replication_test:
|
||||
- '-c1 -m200M'
|
||||
Reference in New Issue
Block a user