mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 19:21:01 +00:00
Revert "Merge "raft: add unit tests for log, tracker, votes and fix found bugs" from Kostja"
This reverts commitf94f70cda8, reversing changes made to5206a97915. Not the latest version of the series was merged. Rvert prior to merging the latest one.
This commit is contained in:
@@ -553,7 +553,7 @@ scylla_raft_core = [
|
||||
'raft/raft.cc',
|
||||
'raft/server.cc',
|
||||
'raft/fsm.cc',
|
||||
'raft/tracker.cc',
|
||||
'raft/progress.cc',
|
||||
'raft/log.cc',
|
||||
]
|
||||
|
||||
|
||||
89
raft/fsm.cc
89
raft/fsm.cc
@@ -35,7 +35,7 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
|
||||
assert(!bool(_current_leader));
|
||||
}
|
||||
|
||||
future<> fsm::wait_max_log_length() {
|
||||
future<> fsm::wait() {
|
||||
check_is_leader();
|
||||
|
||||
return _log_limiter_semaphore->sem.wait();
|
||||
@@ -43,7 +43,7 @@ future<> fsm::wait_max_log_length() {
|
||||
|
||||
const configuration& fsm::get_configuration() const {
|
||||
check_is_leader();
|
||||
return _log.get_configuration();
|
||||
return _tracker->get_configuration();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
@@ -53,7 +53,7 @@ const log_entry& fsm::add_entry(T command) {
|
||||
|
||||
if constexpr (std::is_same_v<T, configuration>) {
|
||||
if (_log.last_conf_idx() > _commit_idx ||
|
||||
_log.get_configuration().is_joint()) {
|
||||
_tracker->get_configuration().is_joint()) {
|
||||
// 4.1. Cluster membership changes/Safety.
|
||||
//
|
||||
// Leaders avoid overlapping configuration changes by
|
||||
@@ -71,7 +71,7 @@ const log_entry& fsm::add_entry(T command) {
|
||||
// configuration for joint consensus (C_old,new) as a log
|
||||
// entry and replicates that entry using the normal Raft
|
||||
// mechanism.
|
||||
configuration tmp(_log.get_configuration());
|
||||
configuration tmp(_tracker->get_configuration());
|
||||
tmp.enter_joint(command.current);
|
||||
command = std::move(tmp);
|
||||
}
|
||||
@@ -87,7 +87,7 @@ const log_entry& fsm::add_entry(T command) {
|
||||
// entry is replicated to the C_new servers, and
|
||||
// a majority of the new configuration is used to
|
||||
// determine the C_new entry’s commitment.
|
||||
_tracker->set_configuration(_log.get_configuration(), _log.last_idx());
|
||||
set_configuration();
|
||||
}
|
||||
|
||||
return *_log[_log.last_idx()];
|
||||
@@ -128,6 +128,20 @@ void fsm::update_current_term(term_t current_term)
|
||||
_randomized_election_timeout = ELECTION_TIMEOUT + logical_clock::duration{dist(re)};
|
||||
}
|
||||
|
||||
void fsm::set_configuration() {
|
||||
|
||||
configuration configuration = _log.last_conf_idx() ?
|
||||
std::get<raft::configuration>(_log[_log.last_conf_idx()]->data) : _log.get_snapshot().config;
|
||||
// We unconditionally access configuration.current[0]
|
||||
// to identify which entries are committed.
|
||||
assert(configuration.current.size() > 0);
|
||||
if (is_leader()) {
|
||||
_tracker->set_configuration(std::move(configuration), _log.last_idx());
|
||||
} else if (is_candidate()) {
|
||||
_votes->set_configuration(std::move(configuration));
|
||||
}
|
||||
}
|
||||
|
||||
void fsm::become_leader() {
|
||||
assert(!std::holds_alternative<leader>(_state));
|
||||
assert(!_tracker);
|
||||
@@ -136,7 +150,7 @@ void fsm::become_leader() {
|
||||
_votes = std::nullopt;
|
||||
_tracker.emplace(_my_id);
|
||||
_log_limiter_semaphore.emplace(this);
|
||||
_log_limiter_semaphore->sem.consume(_log.length());
|
||||
_log_limiter_semaphore->sem.consume(_log.non_snapshoted_length());
|
||||
_last_election_time = _clock.now();
|
||||
// a new leader needs to commit at lease one entry to make sure that
|
||||
// all existing entries in its log are commited as well. Also it should
|
||||
@@ -145,7 +159,7 @@ void fsm::become_leader() {
|
||||
add_entry(log_entry::dummy());
|
||||
// set_configuration() begins replicating from the last entry
|
||||
// in the log.
|
||||
_tracker->set_configuration(_log.get_configuration(), _log.last_idx());
|
||||
set_configuration();
|
||||
replicate();
|
||||
}
|
||||
|
||||
@@ -174,7 +188,8 @@ void fsm::become_candidate() {
|
||||
// and initiating another round of RequestVote RPCs.
|
||||
_last_election_time = _clock.now();
|
||||
|
||||
_votes.emplace(_log.get_configuration());
|
||||
_votes.emplace();
|
||||
set_configuration();
|
||||
|
||||
const auto& voters = _votes->voters();
|
||||
if (voters.find(server_address{_my_id}) == voters.end()) {
|
||||
@@ -291,7 +306,9 @@ void fsm::advance_stable_idx(index_t idx) {
|
||||
// configuration, update it's progress and optionally
|
||||
// commit new entries.
|
||||
if (is_leader() && _tracker->leader_progress()) {
|
||||
_tracker->leader_progress()->stable_to(idx);
|
||||
auto& progress = *_tracker->leader_progress();
|
||||
progress.match_idx = idx;
|
||||
progress.next_idx = index_t{idx + 1};
|
||||
replicate();
|
||||
maybe_commit();
|
||||
}
|
||||
@@ -328,19 +345,15 @@ void fsm::maybe_commit() {
|
||||
_sm_events.signal();
|
||||
|
||||
if (committed_conf_change) {
|
||||
if (_log.get_configuration().is_joint()) {
|
||||
if (_tracker->get_configuration().is_joint()) {
|
||||
// 4.3. Arbitrary configuration changes using joint consensus
|
||||
//
|
||||
// Once the joint consensus has been committed, the
|
||||
// system then transitions to the new configuration.
|
||||
configuration cfg(_log.get_configuration());
|
||||
configuration cfg(_tracker->get_configuration());
|
||||
cfg.leave_joint();
|
||||
_log.emplace_back(seastar::make_lw_shared<log_entry>({_current_term, _log.next_idx(), std::move(cfg)}));
|
||||
_tracker->set_configuration(_log.get_configuration(), _log.last_idx());
|
||||
// Leaving joint configuration may commit more entries
|
||||
// even if we had no new acks, by switching the quorum
|
||||
// from joint to simple majority.
|
||||
maybe_commit();
|
||||
set_configuration();
|
||||
} else if (_tracker->leader_progress() == nullptr) {
|
||||
// 4.2.2 Removing the current leader
|
||||
//
|
||||
@@ -483,7 +496,9 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) {
|
||||
logger.trace("append_entries_reply[{}->{}]: accepted match={} last index={}",
|
||||
_my_id, from, progress.match_idx, last_idx);
|
||||
|
||||
progress.stable_to(last_idx);
|
||||
progress.match_idx = std::max(progress.match_idx, last_idx);
|
||||
// out next_idx may be large because of optimistic increase in pipeline mode
|
||||
progress.next_idx = std::max(progress.next_idx, index_t(last_idx + 1));
|
||||
|
||||
progress.become_pipeline();
|
||||
|
||||
@@ -628,36 +643,38 @@ void fsm::replicate_to(follower_progress& progress, bool allow_empty) {
|
||||
|
||||
allow_empty = false; // allow only one empty message
|
||||
|
||||
// A log containing a snapshot, a few trailing entries and
|
||||
// a few new entries may look like this:
|
||||
// With snapshot prefix enaled the log may look like this:
|
||||
// E - log entry
|
||||
// S_idx - snapshot index
|
||||
// E_i1 E_i2 E_i3 Ei_4 E_i5 E_i6
|
||||
// ^
|
||||
// S_idx = i2
|
||||
// If the follower's next_idx is i1 we need to
|
||||
// enter snapshot transfer mode even when we have
|
||||
// i1 in the log, since it is not possible to get the term of
|
||||
// the entry previous to i1 and verify that the follower's tail
|
||||
// contains no uncommitted entries.
|
||||
index_t prev_idx = progress.next_idx - index_t{1};
|
||||
std::optional<term_t> prev_term = _log.term_for(prev_idx);
|
||||
if (!prev_term) {
|
||||
const snapshot& snapshot = _log.get_snapshot();
|
||||
// We need to transfer the snapshot before we can
|
||||
// S - snapshot
|
||||
// Ei1 Ei2 Ei3 Si4 Ei5 Ei6
|
||||
// If the next_idx is before i2 we need to enter snaphot transfer mode
|
||||
// even though we still have i1 since it is not possibel to get prev
|
||||
// term for it.
|
||||
auto& s = _log.get_snapshot();
|
||||
if (progress.next_idx <= s.idx && progress.next_idx < (_log.start_idx() + 1)) {
|
||||
// The next index to be sent points to a snapshot so
|
||||
// we need to transfer the snapshot before we can
|
||||
// continue syncing the log.
|
||||
progress.become_snapshot();
|
||||
send_to(progress.id, install_snapshot{_current_term, snapshot});
|
||||
send_to(progress.id, install_snapshot{_current_term, _log.get_snapshot()});
|
||||
logger.trace("replicate_to[{}->{}]: send snapshot next={} snapshot={}",
|
||||
_my_id, progress.id, progress.next_idx, snapshot.idx);
|
||||
_my_id, progress.id, progress.next_idx, _log.get_snapshot().idx);
|
||||
return;
|
||||
}
|
||||
|
||||
index_t prev_idx = index_t(0);
|
||||
term_t prev_term = _current_term;
|
||||
if (progress.next_idx != 1) {
|
||||
prev_idx = index_t(progress.next_idx - 1);
|
||||
assert (prev_idx >= _log.start_idx() || s.idx == prev_idx);
|
||||
prev_term = s.idx == prev_idx ? s.term : _log[prev_idx]->term;
|
||||
}
|
||||
|
||||
append_request req = {
|
||||
.current_term = _current_term,
|
||||
.leader_id = _my_id,
|
||||
.prev_log_idx = prev_idx,
|
||||
.prev_log_term = prev_term.value(),
|
||||
.prev_log_term = prev_term,
|
||||
.leader_commit_idx = _commit_idx,
|
||||
.entries = std::vector<log_entry_ptr>()
|
||||
};
|
||||
|
||||
26
raft/fsm.hh
26
raft/fsm.hh
@@ -22,7 +22,7 @@
|
||||
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
#include "raft.hh"
|
||||
#include "tracker.hh"
|
||||
#include "progress.hh"
|
||||
#include "log.hh"
|
||||
|
||||
namespace raft {
|
||||
@@ -41,12 +41,9 @@ struct fsm_output {
|
||||
struct fsm_config {
|
||||
// max size of appended entries in bytes
|
||||
size_t append_request_threshold;
|
||||
// Max number of entries of in-memory part of the log after
|
||||
// which requests are stopped to be admitted until the log
|
||||
// is shrunk back by a snapshot. Should be greater than
|
||||
// whatever the default number of trailing log entries
|
||||
// is configured by the snapshot, otherwise the state
|
||||
// machine will deadlock.
|
||||
// max number of entries of in-memory part of the log after
|
||||
// which requests are stopped to be addmitted unill the log
|
||||
// is shrunk back by snapshoting
|
||||
size_t max_log_length;
|
||||
};
|
||||
|
||||
@@ -250,6 +247,11 @@ private:
|
||||
// Tick implementation on a leader
|
||||
void tick_leader();
|
||||
|
||||
// Reconfigure this instance to use the provided configuration.
|
||||
// Called on start, state change to candidate or leader, or when
|
||||
// a new configuration entry is added.
|
||||
void set_configuration();
|
||||
|
||||
public:
|
||||
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
|
||||
failure_detector& failure_detector, fsm_config conf);
|
||||
@@ -270,9 +272,9 @@ public:
|
||||
return _log.last_term();
|
||||
}
|
||||
|
||||
// Call this function to wait for the number of log entries to
|
||||
// go below max_log_length.
|
||||
future<> wait_max_log_length();
|
||||
// call this function to wait for number of log entries to go below
|
||||
// max_log_length
|
||||
future<> wait();
|
||||
|
||||
// Return current configuration. Throws if not a leader.
|
||||
const configuration& get_configuration() const;
|
||||
@@ -329,8 +331,8 @@ public:
|
||||
// entry. Retruns false if the snapshot is older than existing one.
|
||||
bool apply_snapshot(snapshot snp, size_t traling);
|
||||
|
||||
size_t log_length() const {
|
||||
return _log.length();
|
||||
size_t in_memory_log_size() {
|
||||
return _log.non_snapshoted_length();
|
||||
};
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const fsm& f);
|
||||
|
||||
110
raft/log.cc
110
raft/log.cc
@@ -23,11 +23,11 @@
|
||||
namespace raft {
|
||||
|
||||
log_entry_ptr& log::get_entry(index_t i) {
|
||||
return _log[i - _first_idx];
|
||||
return _log[i - start_idx()];
|
||||
}
|
||||
|
||||
log_entry_ptr& log::operator[](size_t i) {
|
||||
assert(!_log.empty() && index_t(i) >= _first_idx);
|
||||
assert(index_t(i) >= start_idx());
|
||||
return get_entry(index_t(i));
|
||||
}
|
||||
|
||||
@@ -54,19 +54,19 @@ bool log::is_up_to_date(index_t idx, term_t term) const {
|
||||
}
|
||||
|
||||
index_t log::last_idx() const {
|
||||
return index_t(_log.size()) + _first_idx - index_t(1);
|
||||
return index_t(_log.size()) + start_idx() - index_t(1);
|
||||
}
|
||||
|
||||
index_t log::next_idx() const {
|
||||
return last_idx() + index_t(1);
|
||||
}
|
||||
|
||||
void log::truncate(index_t idx) {
|
||||
assert(idx >= _first_idx);
|
||||
auto it = _log.begin() + (idx - _first_idx);
|
||||
void log::truncate_head(index_t idx) {
|
||||
assert(idx >= start_idx());
|
||||
auto it = _log.begin() + (idx - start_idx());
|
||||
_log.erase(it, _log.end());
|
||||
stable_to(std::min(_stable_idx, last_idx()));
|
||||
if (_last_conf_idx > last_idx()) {
|
||||
if (_last_conf_idx > last_idx() ) {
|
||||
// If _prev_conf_idx is 0, this log does not contain any
|
||||
// other configuration changes, since no two uncommitted
|
||||
// configuration changes can be in progress.
|
||||
@@ -76,6 +76,25 @@ void log::truncate(index_t idx) {
|
||||
}
|
||||
}
|
||||
|
||||
void log::truncate_tail(index_t idx) {
|
||||
assert(start_idx() <= idx);
|
||||
|
||||
if (idx >= last_idx()) {
|
||||
_log.clear();
|
||||
} else if (idx > start_idx()) {
|
||||
_log.erase(_log.begin(), _log.begin() + idx - start_idx() + 1);
|
||||
}
|
||||
|
||||
_stable_idx = std::max(idx, _stable_idx);
|
||||
|
||||
if (start_idx() > _prev_conf_idx) {
|
||||
_prev_conf_idx = index_t{0};
|
||||
if (start_idx() > _last_conf_idx) {
|
||||
_last_conf_idx = index_t{0};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void log::init_last_conf_idx() {
|
||||
for (auto it = _log.rbegin(); it != _log.rend(); ++it) {
|
||||
if (std::holds_alternative<configuration>((**it).data)) {
|
||||
@@ -89,9 +108,15 @@ void log::init_last_conf_idx() {
|
||||
}
|
||||
}
|
||||
|
||||
index_t log::start_idx() const {
|
||||
// log my contain entries included in the snapshot, so start idx
|
||||
// may be smaller that snapshot index
|
||||
return (_log.empty() ? _snapshot.idx + index_t(1): _log[0]->idx);
|
||||
}
|
||||
|
||||
term_t log::last_term() const {
|
||||
if (_log.empty()) {
|
||||
return _snapshot.term;
|
||||
return term_t(0);
|
||||
}
|
||||
return _log.back()->term;
|
||||
}
|
||||
@@ -108,18 +133,15 @@ std::pair<bool, term_t> log::match_term(index_t idx, term_t term) const {
|
||||
return std::make_pair(true, term_t(0));
|
||||
}
|
||||
|
||||
// We got some very old AppendEntries we can safely ignore.
|
||||
if (idx < _snapshot.idx) {
|
||||
return std::make_pair(false, last_term());
|
||||
}
|
||||
// idx cannot point into the snapshot
|
||||
assert(idx >= start_idx() || idx == _snapshot.idx);
|
||||
|
||||
term_t my_term;
|
||||
|
||||
if (idx == _snapshot.idx) {
|
||||
my_term = _snapshot.term;
|
||||
} else {
|
||||
assert(idx >= _first_idx);
|
||||
auto i = idx - _first_idx;
|
||||
auto i = idx - start_idx();
|
||||
|
||||
if (i >= _log.size()) {
|
||||
// We have a gap between the follower and the leader.
|
||||
@@ -132,20 +154,6 @@ std::pair<bool, term_t> log::match_term(index_t idx, term_t term) const {
|
||||
return my_term == term ? std::make_pair(true, term_t(0)) : std::make_pair(false, my_term);
|
||||
}
|
||||
|
||||
std::optional<term_t> log::term_for(index_t idx) const {
|
||||
if (!_log.empty() && idx >= _first_idx) {
|
||||
return _log[idx - _first_idx]->term;
|
||||
}
|
||||
if (idx == _snapshot.idx) {
|
||||
return _snapshot.term;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
const configuration& log::get_configuration() const {
|
||||
return last_conf_idx() ? std::get<configuration>(_log[last_conf_idx() - _first_idx]->data) : _snapshot.config;
|
||||
}
|
||||
|
||||
index_t log::maybe_append(std::vector<log_entry_ptr>&& entries) {
|
||||
assert(!entries.empty());
|
||||
|
||||
@@ -155,9 +163,9 @@ index_t log::maybe_append(std::vector<log_entry_ptr>&& entries) {
|
||||
// contains them to ensure the terms match.
|
||||
for (auto& e : entries) {
|
||||
if (e->idx <= last_idx()) {
|
||||
if (e->idx < _first_idx) {
|
||||
if (e->idx < start_idx()) {
|
||||
logger.trace("append_entries: skipping entry with idx {} less than log start {}",
|
||||
e->idx, _first_idx);
|
||||
e->idx, start_idx());
|
||||
continue;
|
||||
}
|
||||
if (e->term == get_entry(e->idx)->term) {
|
||||
@@ -169,7 +177,7 @@ index_t log::maybe_append(std::vector<log_entry_ptr>&& entries) {
|
||||
// If an existing entry conflicts with a new one (same
|
||||
// index but different terms), delete the existing
|
||||
// entry and all that follow it (§5.3).
|
||||
truncate(e->idx);
|
||||
truncate_head(e->idx);
|
||||
}
|
||||
// Assert log monotonicity
|
||||
assert(e->idx == next_idx());
|
||||
@@ -180,44 +188,24 @@ index_t log::maybe_append(std::vector<log_entry_ptr>&& entries) {
|
||||
}
|
||||
|
||||
size_t log::apply_snapshot(snapshot&& snp, size_t trailing) {
|
||||
assert (snp.idx > _snapshot.idx);
|
||||
|
||||
size_t removed;
|
||||
auto idx = snp.idx;
|
||||
|
||||
if (idx > last_idx()) {
|
||||
// Remove all entries ignoring the 'trailing' argument,
|
||||
// since otherwise there would be a gap between old
|
||||
// entries and the next entry index.
|
||||
removed = _log.size();
|
||||
_log.clear();
|
||||
_first_idx = idx + index_t{1};
|
||||
} else {
|
||||
removed = _log.size() - (last_idx() - idx);
|
||||
removed -= std::min(trailing, removed);
|
||||
_log.erase(_log.begin(), _log.begin() + removed);
|
||||
_first_idx = _first_idx + index_t{removed};
|
||||
}
|
||||
|
||||
_stable_idx = std::max(idx, _stable_idx);
|
||||
|
||||
if (_first_idx > _prev_conf_idx) {
|
||||
_prev_conf_idx = index_t{0};
|
||||
if (_first_idx > _last_conf_idx) {
|
||||
_last_conf_idx = index_t{0};
|
||||
}
|
||||
size_t ret = 0;
|
||||
assert (snp.idx >= start_idx());
|
||||
if (snp.idx - start_idx() > index_t(trailing)) {
|
||||
ret = _log.size();
|
||||
// call truncate first since it uses old snapshot
|
||||
truncate_tail(index_t(snp.idx - trailing));
|
||||
ret -= _log.size();
|
||||
}
|
||||
|
||||
_snapshot = std::move(snp);
|
||||
|
||||
return removed;
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const log& l) {
|
||||
os << "first idx: " << l._first_idx << ", ";
|
||||
os << "last idx: " << l.last_idx() << ", ";
|
||||
os << "next idx: " << l.next_idx() << ", ";
|
||||
os << "last idx: " << l.last_idx() << ", ";
|
||||
os << "stable idx: " << l.stable_idx() << ", ";
|
||||
os << "start idx: " << l.start_idx() << ", ";
|
||||
os << "last term: " << l.last_term();
|
||||
return os;
|
||||
}
|
||||
|
||||
48
raft/log.hh
48
raft/log.hh
@@ -38,12 +38,6 @@ class log {
|
||||
// We need something that can be truncated from both sides.
|
||||
// std::deque move constructor is not nothrow hence cannot be used
|
||||
log_entries _log;
|
||||
// Raft log index of the first entry in the log.
|
||||
// Usually it's simply _snapshot.idx + 1,
|
||||
// but if apply_snapshot() with non-zero trailing was used,
|
||||
// it may point at an entry older than the snapshot.
|
||||
// If the log is empty, same as next_idx()
|
||||
index_t _first_idx;
|
||||
// Index of the last stable (persisted) entry in the log.
|
||||
index_t _stable_idx = index_t(0);
|
||||
// Log index of the last configuration change.
|
||||
@@ -71,27 +65,14 @@ class log {
|
||||
// the log backwards after truncate().
|
||||
index_t _prev_conf_idx = index_t{0};
|
||||
private:
|
||||
// Drop uncommitted log entries not present on the leader.
|
||||
void truncate(index_t i);
|
||||
void truncate_head(index_t i);
|
||||
void truncate_tail(index_t i);
|
||||
// A helper used to find the last configuration entry in the
|
||||
// log after it's been loaded from disk.
|
||||
void init_last_conf_idx();
|
||||
log_entry_ptr& get_entry(index_t);
|
||||
public:
|
||||
explicit log(snapshot snp, log_entries log = {})
|
||||
: _snapshot(std::move(snp)), _log(std::move(log)) {
|
||||
if (_log.empty()) {
|
||||
_first_idx = _snapshot.idx + index_t{1};
|
||||
} else {
|
||||
_first_idx = _log[0]->idx;
|
||||
// All log entries following the snapshot must
|
||||
// be present, otherwise we will not be able to
|
||||
// perform an initial state transfer.
|
||||
assert(_first_idx <= _snapshot.idx + 1);
|
||||
}
|
||||
// The snapshot index is at least 0, so _first_idx
|
||||
// is at least 1
|
||||
assert(_first_idx > 0);
|
||||
explicit log(snapshot snp, log_entries log = {}) : _snapshot(std::move(snp)), _log(std::move(log)) {
|
||||
stable_to(last_idx());
|
||||
init_last_conf_idx();
|
||||
}
|
||||
@@ -109,9 +90,8 @@ public:
|
||||
// The voter denies its vote if its own log is more up-to-date
|
||||
// than that of the candidate.
|
||||
bool is_up_to_date(index_t idx, term_t term) const;
|
||||
index_t start_idx() const;
|
||||
index_t next_idx() const;
|
||||
// Return index of the last entry. If the log is empty,
|
||||
// return the index of the last entry in the snapshot.
|
||||
index_t last_idx() const;
|
||||
index_t last_conf_idx() const {
|
||||
return _last_conf_idx;
|
||||
@@ -119,13 +99,9 @@ public:
|
||||
index_t stable_idx() const {
|
||||
return _stable_idx;
|
||||
}
|
||||
// Return the term of the last entry in the log,
|
||||
// or the snapshot term if the log is empty.
|
||||
// Used in elections to not vote for a candidate with
|
||||
// a less recent term.
|
||||
term_t last_term() const;
|
||||
// Return the number of log entries in memory
|
||||
size_t length() const {
|
||||
// return the actual number of log entries in memory
|
||||
size_t non_snapshoted_length() {
|
||||
return _log.size();
|
||||
}
|
||||
|
||||
@@ -162,18 +138,6 @@ public:
|
||||
// @retval first is false - the follower's log doesn't match the leader's
|
||||
// and non matching term is in second
|
||||
std::pair<bool, term_t> match_term(index_t idx, term_t term) const;
|
||||
// Return term number of the entry matching the index. If the
|
||||
// entry is not in the log and does not match snapshot index,
|
||||
// return an empty optional.
|
||||
// Used to validate the log matching rule.
|
||||
std::optional<term_t> term_for(index_t idx) const;
|
||||
|
||||
// Return the latest configuration present in the log.
|
||||
// This would be either the entry at last_conf_idx()
|
||||
// or, if it's not set, the snapshot configuration.
|
||||
// The returned reference is only valid until the next
|
||||
// operation on the log.
|
||||
const configuration& get_configuration() const;
|
||||
|
||||
// Called on a follower to append entries from a leader.
|
||||
// @retval return an index of last appended entry
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "tracker.hh"
|
||||
#include "progress.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
|
||||
@@ -196,26 +196,29 @@ index_t tracker::committed(index_t prev_commit_idx) {
|
||||
}
|
||||
}
|
||||
|
||||
votes::votes(configuration configuration)
|
||||
:_voters(configuration.current)
|
||||
, _current(configuration.current) {
|
||||
|
||||
if (configuration.is_joint()) {
|
||||
_previous.emplace(configuration.previous);
|
||||
_voters.insert(configuration.previous.begin(), configuration.previous.end());
|
||||
void votes::set_configuration(configuration configuration) {
|
||||
_configuration = std::move(configuration);
|
||||
_voters = _configuration.current;
|
||||
if (_configuration.is_joint()) {
|
||||
_voters.insert(_configuration.previous.begin(), _configuration.previous.end());
|
||||
}
|
||||
}
|
||||
|
||||
void votes::register_vote(server_id from, bool granted) {
|
||||
server_address from_address{from};
|
||||
bool registered = false;
|
||||
|
||||
if (_current.register_vote(from, granted)) {
|
||||
if (_configuration.current.find(from_address) != _configuration.current.end()) {
|
||||
_current.register_vote(granted);
|
||||
registered = true;
|
||||
}
|
||||
if (_previous && _previous->register_vote(from, granted)) {
|
||||
if (_configuration.is_joint() &&
|
||||
_configuration.previous.find(from_address) != _configuration.previous.end()) {
|
||||
_previous.register_vote(granted);
|
||||
registered = true;
|
||||
}
|
||||
// Should never receive a vote not requested, unless an RPC bug.
|
||||
// Should never receive a vote not requested, unless an RPC
|
||||
// bug.
|
||||
if (! registered) {
|
||||
seastar::on_internal_error(logger,
|
||||
format("Got a vote from unregistered server {} during election", from));
|
||||
@@ -223,17 +226,17 @@ void votes::register_vote(server_id from, bool granted) {
|
||||
}
|
||||
|
||||
vote_result votes::tally_votes() const {
|
||||
if (_previous) {
|
||||
auto previous_result = _previous->tally_votes();
|
||||
if (_configuration.is_joint()) {
|
||||
auto previous_result = _previous.tally_votes(_configuration.previous.size());
|
||||
if (previous_result != vote_result::WON) {
|
||||
return previous_result;
|
||||
}
|
||||
}
|
||||
return _current.tally_votes();
|
||||
return _current.tally_votes(_configuration.current.size());
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const election_tracker& v) {
|
||||
os << "responded: " << v._responded.size() << ", ";
|
||||
os << "responded: " << v._responded << ", ";
|
||||
os << "granted: " << v._granted;
|
||||
return os;
|
||||
}
|
||||
@@ -241,26 +244,7 @@ std::ostream& operator<<(std::ostream& os, const election_tracker& v) {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const votes& v) {
|
||||
os << "current: " << v._current << std::endl;
|
||||
if (v._previous) {
|
||||
os << "previous: " << v._previous.value() << std::endl;
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const vote_result& v) {
|
||||
static const char *n;
|
||||
switch (v) {
|
||||
case vote_result::UNKNOWN:
|
||||
n = "UNKNOWN";
|
||||
break;
|
||||
case vote_result::WON:
|
||||
n = "WON";
|
||||
break;
|
||||
case vote_result::LOST:
|
||||
n = "LOST";
|
||||
break;
|
||||
}
|
||||
os << n;
|
||||
os << "previous: " << v._previous << std::endl;
|
||||
return os;
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ namespace raft {
|
||||
class follower_progress {
|
||||
public:
|
||||
// Id of this server
|
||||
const server_id id;
|
||||
server_id id;
|
||||
// Index of the next log entry to send to this server.
|
||||
index_t next_idx;
|
||||
// Index of the highest log entry known to be replicated to this
|
||||
@@ -62,18 +62,6 @@ public:
|
||||
void become_pipeline();
|
||||
void become_snapshot();
|
||||
|
||||
void stable_to(index_t idx) {
|
||||
// AppendEntries replies can arrive out of order.
|
||||
if (idx > match_idx) {
|
||||
match_idx = idx;
|
||||
}
|
||||
if (idx >= next_idx) {
|
||||
// idx may be smaller if we increased next_idx
|
||||
// optimistically in pipeline mode.
|
||||
next_idx = idx + index_t{1};
|
||||
}
|
||||
}
|
||||
|
||||
// Return true if a new replication record can be sent to the follower.
|
||||
bool can_send_to();
|
||||
|
||||
@@ -117,6 +105,8 @@ public:
|
||||
follower_progress* leader_progress() {
|
||||
return _leader_progress;
|
||||
}
|
||||
const configuration& get_configuration() const { return _configuration; }
|
||||
|
||||
// Calculate the current commit index based on the current
|
||||
// simple or joint quorum.
|
||||
index_t committed(index_t prev_commit_idx);
|
||||
@@ -126,46 +116,31 @@ public:
|
||||
enum class vote_result {
|
||||
// We haven't got enough responses yet, either because
|
||||
// the servers haven't voted or responses failed to arrive.
|
||||
UNKNOWN = 0,
|
||||
UNKNOWN,
|
||||
// This candidate has won the election
|
||||
WON,
|
||||
// The quorum of servers has voted against this candidate
|
||||
LOST,
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const vote_result& v);
|
||||
|
||||
// State of election in a single quorum
|
||||
class election_tracker {
|
||||
// All eligible voters
|
||||
std::unordered_set<server_id> _suffrage;
|
||||
// Votes collected
|
||||
std::unordered_set<server_id> _responded;
|
||||
size_t _responded = 0;
|
||||
size_t _granted = 0;
|
||||
public:
|
||||
election_tracker(const server_address_set& configuration) {
|
||||
for (const auto& a : configuration) {
|
||||
_suffrage.emplace(a.id);
|
||||
void register_vote(bool granted) {
|
||||
_responded++;
|
||||
if (granted) {
|
||||
_granted++;
|
||||
}
|
||||
}
|
||||
|
||||
bool register_vote(server_id from, bool granted) {
|
||||
if (_suffrage.find(from) == _suffrage.end()) {
|
||||
return false;
|
||||
}
|
||||
if (_responded.emplace(from).second) {
|
||||
// Have not counted this vote yet
|
||||
_granted += static_cast<int>(granted);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
vote_result tally_votes() const {
|
||||
auto quorum = _suffrage.size() / 2 + 1;
|
||||
vote_result tally_votes(size_t cluster_size) const {
|
||||
auto quorum = cluster_size / 2 + 1;
|
||||
if (_granted >= quorum) {
|
||||
return vote_result::WON;
|
||||
}
|
||||
assert(_responded.size() <= _suffrage.size());
|
||||
auto unknown = _suffrage.size() - _responded.size();
|
||||
assert(_responded <= cluster_size);
|
||||
auto unknown = cluster_size - _responded;
|
||||
return _granted + unknown >= quorum ? vote_result::UNKNOWN : vote_result::LOST;
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const election_tracker& v);
|
||||
@@ -173,19 +148,23 @@ public:
|
||||
|
||||
// Candidate's state specific to election
|
||||
class votes {
|
||||
configuration _configuration;
|
||||
server_address_set _voters;
|
||||
election_tracker _current;
|
||||
std::optional<election_tracker> _previous;
|
||||
election_tracker _previous;
|
||||
public:
|
||||
votes(configuration configuration);
|
||||
|
||||
const server_address_set& voters() const {
|
||||
return _voters;
|
||||
}
|
||||
void set_configuration(configuration configuration);
|
||||
|
||||
void register_vote(server_id from, bool granted);
|
||||
vote_result tally_votes() const;
|
||||
|
||||
const configuration& get_configuration() const {
|
||||
return _configuration;
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const votes& v);
|
||||
};
|
||||
|
||||
@@ -183,7 +183,7 @@ server_impl::server_impl(server_id uuid, std::unique_ptr<rpc> rpc,
|
||||
_id(uuid), _config(config) {
|
||||
set_rpc_server(_rpc.get());
|
||||
if (_config.snapshot_threshold > _config.max_log_length) {
|
||||
throw config_error("snapshot_threshold has to be smaller than max_log_length");
|
||||
throw config_error("snapshot_threshold has to be smaller than max_log_lengths");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,8 +219,8 @@ template <typename T>
|
||||
future<> server_impl::add_entry_internal(T command, wait_type type) {
|
||||
logger.trace("An entry is submitted on a leader");
|
||||
|
||||
// Wait for a new slot to become available
|
||||
co_await _fsm->wait_max_log_length();
|
||||
// wait for new slot to be available
|
||||
co_await _fsm->wait();
|
||||
|
||||
logger.trace("An entry proceeds after wait");
|
||||
|
||||
@@ -598,7 +598,7 @@ void server_impl::register_metrics() {
|
||||
sm::make_total_operations("snapshots_taken", _stats.snapshots_taken,
|
||||
sm::description("how many time the user's state machine was snapshotted"), {server_id_label(_id)}),
|
||||
|
||||
sm::make_gauge("in_memory_log_size", [this] { return _fsm->log_length(); },
|
||||
sm::make_gauge("in_memory_log_size", [this] { return _fsm->in_memory_log_size(); },
|
||||
sm::description("size of in-memory part of the log"), {server_id_label(_id)}),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -39,12 +39,11 @@ public:
|
||||
size_t snapshot_trailing = 200;
|
||||
// max size of appended entries in bytes
|
||||
size_t append_request_threshold = 100000;
|
||||
// Max number of entries of in-memory part of the log after
|
||||
// which requests are stopped to be admitted until the log
|
||||
// is shrunk back by a snapshot. Should be greater than
|
||||
// whatever the default number of trailing log entries
|
||||
// is configured by the snapshot, otherwise the state
|
||||
// machine will deadlock on attempt to submit a new entry.
|
||||
// max number of entries of in-memory part of the log after
|
||||
// which requests are stopped to be addmitted unill the log
|
||||
// is shrunk back by snapshoting. It has to be greater than
|
||||
// snapshot_threshold otherwise submition of new entries will
|
||||
// deadlock.
|
||||
size_t max_log_length = 5000;
|
||||
};
|
||||
|
||||
|
||||
@@ -78,7 +78,7 @@ raft::fsm_config fsm_cfg{.append_request_threshold = 1};
|
||||
class fsm_debug : public raft::fsm {
|
||||
public:
|
||||
using raft::fsm::fsm;
|
||||
const raft::follower_progress& get_progress(server_id id) {
|
||||
raft::follower_progress& get_progress(server_id id) {
|
||||
raft::follower_progress* progress = _tracker->find(id);
|
||||
return *progress;
|
||||
}
|
||||
@@ -139,18 +139,18 @@ BOOST_AUTO_TEST_CASE(test_progress_resume_by_append_resp) {
|
||||
fsm.step(id2, raft::vote_reply{output.term, true});
|
||||
BOOST_CHECK(fsm.is_leader());
|
||||
|
||||
const raft::follower_progress& fprogress = fsm.get_progress(id2);
|
||||
raft::follower_progress& fprogress = fsm.get_progress(id2);
|
||||
BOOST_CHECK(fprogress.state == raft::follower_progress::state::PROBE);
|
||||
|
||||
const raft::follower_progress& fprogress2 = fsm.get_progress(id2);
|
||||
BOOST_CHECK(!fprogress2.probe_sent);
|
||||
fprogress = fsm.get_progress(id2);
|
||||
BOOST_CHECK(!fprogress.probe_sent);
|
||||
raft::command cmd = create_command(1);
|
||||
raft::log_entry le = fsm.add_entry(std::move(cmd));
|
||||
do {
|
||||
output = fsm.get_output();
|
||||
} while (output.messages.size() == 0);
|
||||
|
||||
BOOST_CHECK(fprogress2.probe_sent);
|
||||
BOOST_CHECK(fprogress.probe_sent);
|
||||
}
|
||||
|
||||
// TestProgressPaused
|
||||
@@ -200,7 +200,7 @@ BOOST_AUTO_TEST_CASE(test_progress_flow_control) {
|
||||
fsm.step(id2, raft::vote_reply{output.term, true});
|
||||
// Throw away all the messages relating to the initial election.
|
||||
output = fsm.get_output();
|
||||
const raft::follower_progress& fprogress = fsm.get_progress(id2);
|
||||
raft::follower_progress& fprogress = fsm.get_progress(id2);
|
||||
BOOST_CHECK(fprogress.state == raft::follower_progress::state::PROBE);
|
||||
|
||||
// While node 2 is in probe state, propose a bunch of entries.
|
||||
@@ -228,8 +228,8 @@ BOOST_AUTO_TEST_CASE(test_progress_flow_control) {
|
||||
// When this append is acked, we change to replicate state and can
|
||||
// send multiple messages at once. (PIPELINE)
|
||||
fsm.step(id2, raft::append_reply{msg.current_term, le->idx, raft::append_reply::accepted{le->idx}});
|
||||
const raft::follower_progress& fprogress2 = fsm.get_progress(id2);
|
||||
BOOST_CHECK(fprogress2.state == raft::follower_progress::state::PIPELINE);
|
||||
fprogress = fsm.get_progress(id2);
|
||||
BOOST_CHECK(fprogress.state == raft::follower_progress::state::PIPELINE);
|
||||
|
||||
do {
|
||||
output = fsm.get_output();
|
||||
@@ -493,7 +493,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_1) {
|
||||
for (auto& [id, msg] : output.messages) {
|
||||
BOOST_REQUIRE_NO_THROW(areq = std::get<raft::append_request>(msg));
|
||||
BOOST_CHECK(areq.prev_log_idx == 0);
|
||||
BOOST_CHECK(areq.prev_log_term == 0);
|
||||
BOOST_CHECK(areq.prev_log_term == current_term);
|
||||
BOOST_CHECK(areq.entries.size() == 1);
|
||||
lep = areq.entries.back();
|
||||
BOOST_CHECK(lep->idx == dummy_idx);
|
||||
|
||||
@@ -27,8 +27,6 @@
|
||||
#include "raft/fsm.hh"
|
||||
|
||||
using raft::term_t, raft::index_t, raft::server_id;
|
||||
using raft::log_entry;
|
||||
using seastar::make_lw_shared;
|
||||
|
||||
void election_threshold(raft::fsm& fsm) {
|
||||
for (int i = 0; i <= raft::ELECTION_TIMEOUT.count(); i++) {
|
||||
@@ -49,233 +47,8 @@ struct failure_detector: public raft::failure_detector {
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T> void add_entry(raft::log& log, T cmd) {
|
||||
log.emplace_back(make_lw_shared<log_entry>(log_entry{log.last_term(), log.next_idx(), cmd}));
|
||||
}
|
||||
|
||||
raft::snapshot log_snapshot(raft::log& log, index_t idx) {
|
||||
return raft::snapshot{.idx = idx, .term = log.last_term(), .config = log.get_snapshot().config};
|
||||
}
|
||||
|
||||
raft::fsm_config fsm_cfg{.append_request_threshold = 1};
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_votes) {
|
||||
auto id = []() -> raft::server_address { return raft::server_address{utils::make_random_uuid()}; };
|
||||
auto id1 = id();
|
||||
|
||||
raft::votes votes(raft::configuration({id1}));
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN);
|
||||
BOOST_CHECK_EQUAL(votes.voters().size(), 1);
|
||||
// Try a vote from an unknown server, it should be ignored.
|
||||
BOOST_CHECK_THROW(votes.register_vote(id().id, true), std::runtime_error);
|
||||
votes.register_vote(id1.id, false);
|
||||
// Quorum votes against the decision
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST);
|
||||
// Another vote from the same server is ignored
|
||||
votes.register_vote(id1.id, true);
|
||||
votes.register_vote(id1.id, true);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST);
|
||||
auto id2 = id();
|
||||
votes = raft::votes(raft::configuration({id1, id2}));
|
||||
BOOST_CHECK_EQUAL(votes.voters().size(), 2);
|
||||
votes.register_vote(id1.id, true);
|
||||
// We need a quorum of participants to win an election
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN);
|
||||
votes.register_vote(id2.id, false);
|
||||
// At this point it's clear we don't have enough votes
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST);
|
||||
auto id3 = id();
|
||||
// Joint configuration
|
||||
votes = raft::votes(raft::configuration({id1}, {id2, id3}));
|
||||
BOOST_CHECK_EQUAL(votes.voters().size(), 3);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN);
|
||||
votes.register_vote(id2.id, true);
|
||||
votes.register_vote(id3.id, true);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN);
|
||||
votes.register_vote(id1.id, false);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST);
|
||||
votes = raft::votes(raft::configuration({id1}, {id2, id3}));
|
||||
votes.register_vote(id2.id, true);
|
||||
votes.register_vote(id3.id, true);
|
||||
votes.register_vote(id1.id, true);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::WON);
|
||||
votes = raft::votes(raft::configuration({id1, id2, id3}, {id1}));
|
||||
BOOST_CHECK_EQUAL(votes.voters().size(), 3);
|
||||
votes.register_vote(id1.id, true);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN);
|
||||
// This gives us a majority in both new and old
|
||||
// configurations.
|
||||
votes.register_vote(id2.id, true);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::WON);
|
||||
// Basic voting test for 4 nodes
|
||||
auto id4 = id();
|
||||
votes = raft::votes(raft::configuration({id1, id2, id3, id4}));
|
||||
votes.register_vote(id1.id, true);
|
||||
votes.register_vote(id2.id, true);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN);
|
||||
votes.register_vote(id3.id, false);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN);
|
||||
votes.register_vote(id4.id, false);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST);
|
||||
auto id5 = id();
|
||||
// Basic voting test for 5 nodes
|
||||
votes = raft::votes(raft::configuration({id1, id2, id3, id4, id5}, {id1, id2, id3}));
|
||||
votes.register_vote(id1.id, false);
|
||||
votes.register_vote(id2.id, false);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST);
|
||||
votes.register_vote(id3.id, true);
|
||||
votes.register_vote(id4.id, true);
|
||||
votes.register_vote(id5.id, true);
|
||||
BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_tracker) {
|
||||
auto id = []() -> raft::server_address { return raft::server_address{utils::make_random_uuid()}; };
|
||||
auto id1 = id();
|
||||
raft::tracker tracker(id1.id);
|
||||
raft::configuration cfg({id1});
|
||||
tracker.set_configuration(cfg, index_t{1});
|
||||
BOOST_CHECK_NE(tracker.find(id1.id), nullptr);
|
||||
// The node with id set during construction is assumed to be
|
||||
// the leader, since otherwise we wouldn't create a tracker
|
||||
// in the first place.
|
||||
BOOST_CHECK_EQUAL(tracker.find(id1.id), tracker.leader_progress());
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{0}), index_t{0});
|
||||
// Avoid keeping a reference, follower_progress address may
|
||||
// change with configuration change
|
||||
auto pr = [&tracker](raft::server_address address) -> raft::follower_progress* {
|
||||
return tracker.find(address.id);
|
||||
};
|
||||
BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{0});
|
||||
BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{1});
|
||||
|
||||
pr(id1)->stable_to(index_t{1});
|
||||
BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{1});
|
||||
BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{2});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{0}), index_t{1});
|
||||
|
||||
pr(id1)->stable_to(index_t{10});
|
||||
BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{10});
|
||||
BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{11});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{0}), index_t{10});
|
||||
|
||||
// Out of order confirmation is OK
|
||||
//
|
||||
pr(id1)->stable_to(index_t{5});
|
||||
BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{10});
|
||||
BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{11});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{5}), index_t{10});
|
||||
|
||||
// Enter joint configuration {A,B,C}
|
||||
auto id2 = id(), id3 = id();
|
||||
cfg.enter_joint({id1, id2, id3});
|
||||
tracker.set_configuration(cfg, index_t{1});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{10});
|
||||
pr(id2)->stable_to(index_t{11});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{10});
|
||||
pr(id3)->stable_to(index_t{12});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{10});
|
||||
pr(id1)->stable_to(index_t{13});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{12});
|
||||
pr(id1)->stable_to(index_t{14});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13});
|
||||
|
||||
// Leave joint configuration, final configuration is {A,B,C}
|
||||
cfg.leave_joint();
|
||||
tracker.set_configuration(cfg, index_t{1});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13});
|
||||
|
||||
auto id4 = id(), id5 = id();
|
||||
cfg.enter_joint({id3, id4, id5});
|
||||
tracker.set_configuration(cfg, index_t{1});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13});
|
||||
pr(id1)->stable_to(index_t{15});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13});
|
||||
pr(id5)->stable_to(index_t{15});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13});
|
||||
pr(id3)->stable_to(index_t{15});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{15});
|
||||
// This does not advance the joint quorum
|
||||
pr(id1)->stable_to(index_t{16});
|
||||
pr(id4)->stable_to(index_t{17});
|
||||
pr(id5)->stable_to(index_t{18});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{15}), index_t{15});
|
||||
|
||||
cfg.leave_joint();
|
||||
tracker.set_configuration(cfg, index_t{1});
|
||||
// Leaving joint configuration commits more entries
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{15}), index_t{17});
|
||||
//
|
||||
cfg.enter_joint({id1});
|
||||
cfg.leave_joint();
|
||||
cfg.enter_joint({id2});
|
||||
tracker.set_configuration(cfg, index_t{1});
|
||||
// Sic: we're in a weird state. The joint commit index
|
||||
// is actually 1, since id2 is at position 1. But in
|
||||
// unwinding back the commit index would be weird,
|
||||
// so we report back the hint (prev_commit_idx).
|
||||
// As soon as the cluster enters joint configuration,
|
||||
// and old quorum is insufficient, the leader won't be able to
|
||||
// commit new entries until the new members catch up.
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{17}), index_t{17});
|
||||
pr(id1)->stable_to(index_t{18});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{17}), index_t{17});
|
||||
pr(id2)->stable_to(index_t{19});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{17}), index_t{18});
|
||||
pr(id1)->stable_to(index_t{20});
|
||||
BOOST_CHECK_EQUAL(tracker.committed(index_t{18}), index_t{19});
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) {
|
||||
// last_conf_idx, prev_conf_idx are initialized correctly,
|
||||
// and maintained during truncate head/truncate tail
|
||||
server_id id1{utils::make_random_uuid()};
|
||||
raft::configuration cfg({id1});
|
||||
raft::log log{raft::snapshot{.config = cfg}};
|
||||
BOOST_CHECK_EQUAL(log.last_conf_idx(), 0);
|
||||
add_entry(log, cfg);
|
||||
BOOST_CHECK_EQUAL(log.last_conf_idx(), 1);
|
||||
add_entry(log, log_entry::dummy{});
|
||||
add_entry(log, cfg);
|
||||
BOOST_CHECK_EQUAL(log.last_conf_idx(), 3);
|
||||
// apply snapshot truncates the log and resets last_conf_idx()
|
||||
log.apply_snapshot(log_snapshot(log, log.last_idx()), 0);
|
||||
BOOST_CHECK_EQUAL(log.last_conf_idx(), 0);
|
||||
// log::last_term() is maintained correctly by truncate_head/truncate_tail() (snapshotting)
|
||||
BOOST_CHECK_EQUAL(log.last_term(), log.get_snapshot().term);
|
||||
BOOST_CHECK(log.term_for(log.get_snapshot().idx).has_value());
|
||||
BOOST_CHECK_EQUAL(log.term_for(log.get_snapshot().idx).value(), log.get_snapshot().term);
|
||||
BOOST_CHECK(! log.term_for(log.last_idx() - index_t{1}).has_value());
|
||||
add_entry(log, log_entry::dummy{});
|
||||
BOOST_CHECK(log.term_for(log.last_idx()).has_value());
|
||||
add_entry(log, log_entry::dummy{});
|
||||
const size_t GAP = 10;
|
||||
// apply_snapshot with a log gap, this should clear all log
|
||||
// entries, despite that trailing is given, a gap
|
||||
// between old log entries and a snapshot would violate
|
||||
// log continuity.
|
||||
log.apply_snapshot(log_snapshot(log, log.last_idx() + index_t{GAP}), GAP * 2);
|
||||
BOOST_CHECK(log.empty());
|
||||
BOOST_CHECK_EQUAL(log.next_idx(), log.get_snapshot().idx + index_t{1});
|
||||
add_entry(log, log_entry::dummy{});
|
||||
BOOST_CHECK_EQUAL(log.length(), 1);
|
||||
add_entry(log, log_entry::dummy{});
|
||||
BOOST_CHECK_EQUAL(log.length(), 2);
|
||||
// Set trailing longer than the length of the log.
|
||||
log.apply_snapshot(log_snapshot(log, log.last_idx()), 3);
|
||||
BOOST_CHECK_EQUAL(log.length(), 2);
|
||||
// Set trailing the same length as the current log length
|
||||
add_entry(log, log_entry::dummy{});
|
||||
BOOST_CHECK_EQUAL(log.length(), 3);
|
||||
log.apply_snapshot(log_snapshot(log, log.last_idx()), 3);
|
||||
BOOST_CHECK_EQUAL(log.length(), 3);
|
||||
BOOST_CHECK_EQUAL(log.last_conf_idx(), 0);
|
||||
add_entry(log, log_entry::dummy{});
|
||||
// Set trailing shorter than the length of the log
|
||||
log.apply_snapshot(log_snapshot(log, log.last_idx()), 1);
|
||||
BOOST_CHECK_EQUAL(log.length(), 1);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_election_single_node) {
|
||||
|
||||
failure_detector fd;
|
||||
@@ -334,10 +107,6 @@ BOOST_AUTO_TEST_CASE(test_single_node_is_quiet) {
|
||||
fsm.add_entry(raft::command{});
|
||||
|
||||
BOOST_CHECK(fsm.get_output().messages.empty());
|
||||
|
||||
fsm.tick();
|
||||
|
||||
BOOST_CHECK(fsm.get_output().messages.empty());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_election_two_nodes) {
|
||||
|
||||
Reference in New Issue
Block a user