diff --git a/configure.py b/configure.py index 3b79b6c42a..68cb531d24 100755 --- a/configure.py +++ b/configure.py @@ -553,7 +553,7 @@ scylla_raft_core = [ 'raft/raft.cc', 'raft/server.cc', 'raft/fsm.cc', - 'raft/tracker.cc', + 'raft/progress.cc', 'raft/log.cc', ] diff --git a/raft/fsm.cc b/raft/fsm.cc index fb55875b20..a0b07ac843 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -35,7 +35,7 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, assert(!bool(_current_leader)); } -future<> fsm::wait_max_log_length() { +future<> fsm::wait() { check_is_leader(); return _log_limiter_semaphore->sem.wait(); @@ -43,7 +43,7 @@ future<> fsm::wait_max_log_length() { const configuration& fsm::get_configuration() const { check_is_leader(); - return _log.get_configuration(); + return _tracker->get_configuration(); } template @@ -53,7 +53,7 @@ const log_entry& fsm::add_entry(T command) { if constexpr (std::is_same_v) { if (_log.last_conf_idx() > _commit_idx || - _log.get_configuration().is_joint()) { + _tracker->get_configuration().is_joint()) { // 4.1. Cluster membership changes/Safety. // // Leaders avoid overlapping configuration changes by @@ -71,7 +71,7 @@ const log_entry& fsm::add_entry(T command) { // configuration for joint consensus (C_old,new) as a log // entry and replicates that entry using the normal Raft // mechanism. - configuration tmp(_log.get_configuration()); + configuration tmp(_tracker->get_configuration()); tmp.enter_joint(command.current); command = std::move(tmp); } @@ -87,7 +87,7 @@ const log_entry& fsm::add_entry(T command) { // entry is replicated to the C_new servers, and // a majority of the new configuration is used to // determine the C_new entry’s commitment. - _tracker->set_configuration(_log.get_configuration(), _log.last_idx()); + set_configuration(); } return *_log[_log.last_idx()]; @@ -128,6 +128,20 @@ void fsm::update_current_term(term_t current_term) _randomized_election_timeout = ELECTION_TIMEOUT + logical_clock::duration{dist(re)}; } +void fsm::set_configuration() { + + configuration configuration = _log.last_conf_idx() ? + std::get(_log[_log.last_conf_idx()]->data) : _log.get_snapshot().config; + // We unconditionally access configuration.current[0] + // to identify which entries are committed. + assert(configuration.current.size() > 0); + if (is_leader()) { + _tracker->set_configuration(std::move(configuration), _log.last_idx()); + } else if (is_candidate()) { + _votes->set_configuration(std::move(configuration)); + } +} + void fsm::become_leader() { assert(!std::holds_alternative(_state)); assert(!_tracker); @@ -136,7 +150,7 @@ void fsm::become_leader() { _votes = std::nullopt; _tracker.emplace(_my_id); _log_limiter_semaphore.emplace(this); - _log_limiter_semaphore->sem.consume(_log.length()); + _log_limiter_semaphore->sem.consume(_log.non_snapshoted_length()); _last_election_time = _clock.now(); // a new leader needs to commit at lease one entry to make sure that // all existing entries in its log are commited as well. Also it should @@ -145,7 +159,7 @@ void fsm::become_leader() { add_entry(log_entry::dummy()); // set_configuration() begins replicating from the last entry // in the log. - _tracker->set_configuration(_log.get_configuration(), _log.last_idx()); + set_configuration(); replicate(); } @@ -174,7 +188,8 @@ void fsm::become_candidate() { // and initiating another round of RequestVote RPCs. _last_election_time = _clock.now(); - _votes.emplace(_log.get_configuration()); + _votes.emplace(); + set_configuration(); const auto& voters = _votes->voters(); if (voters.find(server_address{_my_id}) == voters.end()) { @@ -291,7 +306,9 @@ void fsm::advance_stable_idx(index_t idx) { // configuration, update it's progress and optionally // commit new entries. if (is_leader() && _tracker->leader_progress()) { - _tracker->leader_progress()->stable_to(idx); + auto& progress = *_tracker->leader_progress(); + progress.match_idx = idx; + progress.next_idx = index_t{idx + 1}; replicate(); maybe_commit(); } @@ -328,19 +345,15 @@ void fsm::maybe_commit() { _sm_events.signal(); if (committed_conf_change) { - if (_log.get_configuration().is_joint()) { + if (_tracker->get_configuration().is_joint()) { // 4.3. Arbitrary configuration changes using joint consensus // // Once the joint consensus has been committed, the // system then transitions to the new configuration. - configuration cfg(_log.get_configuration()); + configuration cfg(_tracker->get_configuration()); cfg.leave_joint(); _log.emplace_back(seastar::make_lw_shared({_current_term, _log.next_idx(), std::move(cfg)})); - _tracker->set_configuration(_log.get_configuration(), _log.last_idx()); - // Leaving joint configuration may commit more entries - // even if we had no new acks, by switching the quorum - // from joint to simple majority. - maybe_commit(); + set_configuration(); } else if (_tracker->leader_progress() == nullptr) { // 4.2.2 Removing the current leader // @@ -483,7 +496,9 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) { logger.trace("append_entries_reply[{}->{}]: accepted match={} last index={}", _my_id, from, progress.match_idx, last_idx); - progress.stable_to(last_idx); + progress.match_idx = std::max(progress.match_idx, last_idx); + // out next_idx may be large because of optimistic increase in pipeline mode + progress.next_idx = std::max(progress.next_idx, index_t(last_idx + 1)); progress.become_pipeline(); @@ -628,36 +643,38 @@ void fsm::replicate_to(follower_progress& progress, bool allow_empty) { allow_empty = false; // allow only one empty message - // A log containing a snapshot, a few trailing entries and - // a few new entries may look like this: + // With snapshot prefix enaled the log may look like this: // E - log entry - // S_idx - snapshot index - // E_i1 E_i2 E_i3 Ei_4 E_i5 E_i6 - // ^ - // S_idx = i2 - // If the follower's next_idx is i1 we need to - // enter snapshot transfer mode even when we have - // i1 in the log, since it is not possible to get the term of - // the entry previous to i1 and verify that the follower's tail - // contains no uncommitted entries. - index_t prev_idx = progress.next_idx - index_t{1}; - std::optional prev_term = _log.term_for(prev_idx); - if (!prev_term) { - const snapshot& snapshot = _log.get_snapshot(); - // We need to transfer the snapshot before we can + // S - snapshot + // Ei1 Ei2 Ei3 Si4 Ei5 Ei6 + // If the next_idx is before i2 we need to enter snaphot transfer mode + // even though we still have i1 since it is not possibel to get prev + // term for it. + auto& s = _log.get_snapshot(); + if (progress.next_idx <= s.idx && progress.next_idx < (_log.start_idx() + 1)) { + // The next index to be sent points to a snapshot so + // we need to transfer the snapshot before we can // continue syncing the log. progress.become_snapshot(); - send_to(progress.id, install_snapshot{_current_term, snapshot}); + send_to(progress.id, install_snapshot{_current_term, _log.get_snapshot()}); logger.trace("replicate_to[{}->{}]: send snapshot next={} snapshot={}", - _my_id, progress.id, progress.next_idx, snapshot.idx); + _my_id, progress.id, progress.next_idx, _log.get_snapshot().idx); return; } + index_t prev_idx = index_t(0); + term_t prev_term = _current_term; + if (progress.next_idx != 1) { + prev_idx = index_t(progress.next_idx - 1); + assert (prev_idx >= _log.start_idx() || s.idx == prev_idx); + prev_term = s.idx == prev_idx ? s.term : _log[prev_idx]->term; + } + append_request req = { .current_term = _current_term, .leader_id = _my_id, .prev_log_idx = prev_idx, - .prev_log_term = prev_term.value(), + .prev_log_term = prev_term, .leader_commit_idx = _commit_idx, .entries = std::vector() }; diff --git a/raft/fsm.hh b/raft/fsm.hh index 17fb348284..8875d24641 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -22,7 +22,7 @@ #include #include "raft.hh" -#include "tracker.hh" +#include "progress.hh" #include "log.hh" namespace raft { @@ -41,12 +41,9 @@ struct fsm_output { struct fsm_config { // max size of appended entries in bytes size_t append_request_threshold; - // Max number of entries of in-memory part of the log after - // which requests are stopped to be admitted until the log - // is shrunk back by a snapshot. Should be greater than - // whatever the default number of trailing log entries - // is configured by the snapshot, otherwise the state - // machine will deadlock. + // max number of entries of in-memory part of the log after + // which requests are stopped to be addmitted unill the log + // is shrunk back by snapshoting size_t max_log_length; }; @@ -250,6 +247,11 @@ private: // Tick implementation on a leader void tick_leader(); + // Reconfigure this instance to use the provided configuration. + // Called on start, state change to candidate or leader, or when + // a new configuration entry is added. + void set_configuration(); + public: explicit fsm(server_id id, term_t current_term, server_id voted_for, log log, failure_detector& failure_detector, fsm_config conf); @@ -270,9 +272,9 @@ public: return _log.last_term(); } - // Call this function to wait for the number of log entries to - // go below max_log_length. - future<> wait_max_log_length(); + // call this function to wait for number of log entries to go below + // max_log_length + future<> wait(); // Return current configuration. Throws if not a leader. const configuration& get_configuration() const; @@ -329,8 +331,8 @@ public: // entry. Retruns false if the snapshot is older than existing one. bool apply_snapshot(snapshot snp, size_t traling); - size_t log_length() const { - return _log.length(); + size_t in_memory_log_size() { + return _log.non_snapshoted_length(); }; friend std::ostream& operator<<(std::ostream& os, const fsm& f); diff --git a/raft/log.cc b/raft/log.cc index c939c90ec7..ac6dac1db1 100644 --- a/raft/log.cc +++ b/raft/log.cc @@ -23,11 +23,11 @@ namespace raft { log_entry_ptr& log::get_entry(index_t i) { - return _log[i - _first_idx]; + return _log[i - start_idx()]; } log_entry_ptr& log::operator[](size_t i) { - assert(!_log.empty() && index_t(i) >= _first_idx); + assert(index_t(i) >= start_idx()); return get_entry(index_t(i)); } @@ -54,19 +54,19 @@ bool log::is_up_to_date(index_t idx, term_t term) const { } index_t log::last_idx() const { - return index_t(_log.size()) + _first_idx - index_t(1); + return index_t(_log.size()) + start_idx() - index_t(1); } index_t log::next_idx() const { return last_idx() + index_t(1); } -void log::truncate(index_t idx) { - assert(idx >= _first_idx); - auto it = _log.begin() + (idx - _first_idx); +void log::truncate_head(index_t idx) { + assert(idx >= start_idx()); + auto it = _log.begin() + (idx - start_idx()); _log.erase(it, _log.end()); stable_to(std::min(_stable_idx, last_idx())); - if (_last_conf_idx > last_idx()) { + if (_last_conf_idx > last_idx() ) { // If _prev_conf_idx is 0, this log does not contain any // other configuration changes, since no two uncommitted // configuration changes can be in progress. @@ -76,6 +76,25 @@ void log::truncate(index_t idx) { } } +void log::truncate_tail(index_t idx) { + assert(start_idx() <= idx); + + if (idx >= last_idx()) { + _log.clear(); + } else if (idx > start_idx()) { + _log.erase(_log.begin(), _log.begin() + idx - start_idx() + 1); + } + + _stable_idx = std::max(idx, _stable_idx); + + if (start_idx() > _prev_conf_idx) { + _prev_conf_idx = index_t{0}; + if (start_idx() > _last_conf_idx) { + _last_conf_idx = index_t{0}; + } + } +} + void log::init_last_conf_idx() { for (auto it = _log.rbegin(); it != _log.rend(); ++it) { if (std::holds_alternative((**it).data)) { @@ -89,9 +108,15 @@ void log::init_last_conf_idx() { } } +index_t log::start_idx() const { + // log my contain entries included in the snapshot, so start idx + // may be smaller that snapshot index + return (_log.empty() ? _snapshot.idx + index_t(1): _log[0]->idx); +} + term_t log::last_term() const { if (_log.empty()) { - return _snapshot.term; + return term_t(0); } return _log.back()->term; } @@ -108,18 +133,15 @@ std::pair log::match_term(index_t idx, term_t term) const { return std::make_pair(true, term_t(0)); } - // We got some very old AppendEntries we can safely ignore. - if (idx < _snapshot.idx) { - return std::make_pair(false, last_term()); - } + // idx cannot point into the snapshot + assert(idx >= start_idx() || idx == _snapshot.idx); term_t my_term; if (idx == _snapshot.idx) { my_term = _snapshot.term; } else { - assert(idx >= _first_idx); - auto i = idx - _first_idx; + auto i = idx - start_idx(); if (i >= _log.size()) { // We have a gap between the follower and the leader. @@ -132,20 +154,6 @@ std::pair log::match_term(index_t idx, term_t term) const { return my_term == term ? std::make_pair(true, term_t(0)) : std::make_pair(false, my_term); } -std::optional log::term_for(index_t idx) const { - if (!_log.empty() && idx >= _first_idx) { - return _log[idx - _first_idx]->term; - } - if (idx == _snapshot.idx) { - return _snapshot.term; - } - return {}; -} - -const configuration& log::get_configuration() const { - return last_conf_idx() ? std::get(_log[last_conf_idx() - _first_idx]->data) : _snapshot.config; -} - index_t log::maybe_append(std::vector&& entries) { assert(!entries.empty()); @@ -155,9 +163,9 @@ index_t log::maybe_append(std::vector&& entries) { // contains them to ensure the terms match. for (auto& e : entries) { if (e->idx <= last_idx()) { - if (e->idx < _first_idx) { + if (e->idx < start_idx()) { logger.trace("append_entries: skipping entry with idx {} less than log start {}", - e->idx, _first_idx); + e->idx, start_idx()); continue; } if (e->term == get_entry(e->idx)->term) { @@ -169,7 +177,7 @@ index_t log::maybe_append(std::vector&& entries) { // If an existing entry conflicts with a new one (same // index but different terms), delete the existing // entry and all that follow it (§5.3). - truncate(e->idx); + truncate_head(e->idx); } // Assert log monotonicity assert(e->idx == next_idx()); @@ -180,44 +188,24 @@ index_t log::maybe_append(std::vector&& entries) { } size_t log::apply_snapshot(snapshot&& snp, size_t trailing) { - assert (snp.idx > _snapshot.idx); - - size_t removed; - auto idx = snp.idx; - - if (idx > last_idx()) { - // Remove all entries ignoring the 'trailing' argument, - // since otherwise there would be a gap between old - // entries and the next entry index. - removed = _log.size(); - _log.clear(); - _first_idx = idx + index_t{1}; - } else { - removed = _log.size() - (last_idx() - idx); - removed -= std::min(trailing, removed); - _log.erase(_log.begin(), _log.begin() + removed); - _first_idx = _first_idx + index_t{removed}; - } - - _stable_idx = std::max(idx, _stable_idx); - - if (_first_idx > _prev_conf_idx) { - _prev_conf_idx = index_t{0}; - if (_first_idx > _last_conf_idx) { - _last_conf_idx = index_t{0}; - } + size_t ret = 0; + assert (snp.idx >= start_idx()); + if (snp.idx - start_idx() > index_t(trailing)) { + ret = _log.size(); + // call truncate first since it uses old snapshot + truncate_tail(index_t(snp.idx - trailing)); + ret -= _log.size(); } _snapshot = std::move(snp); - - return removed; + return ret; } std::ostream& operator<<(std::ostream& os, const log& l) { - os << "first idx: " << l._first_idx << ", "; - os << "last idx: " << l.last_idx() << ", "; os << "next idx: " << l.next_idx() << ", "; + os << "last idx: " << l.last_idx() << ", "; os << "stable idx: " << l.stable_idx() << ", "; + os << "start idx: " << l.start_idx() << ", "; os << "last term: " << l.last_term(); return os; } diff --git a/raft/log.hh b/raft/log.hh index e96999cd1a..f701092d88 100644 --- a/raft/log.hh +++ b/raft/log.hh @@ -38,12 +38,6 @@ class log { // We need something that can be truncated from both sides. // std::deque move constructor is not nothrow hence cannot be used log_entries _log; - // Raft log index of the first entry in the log. - // Usually it's simply _snapshot.idx + 1, - // but if apply_snapshot() with non-zero trailing was used, - // it may point at an entry older than the snapshot. - // If the log is empty, same as next_idx() - index_t _first_idx; // Index of the last stable (persisted) entry in the log. index_t _stable_idx = index_t(0); // Log index of the last configuration change. @@ -71,27 +65,14 @@ class log { // the log backwards after truncate(). index_t _prev_conf_idx = index_t{0}; private: - // Drop uncommitted log entries not present on the leader. - void truncate(index_t i); + void truncate_head(index_t i); + void truncate_tail(index_t i); // A helper used to find the last configuration entry in the // log after it's been loaded from disk. void init_last_conf_idx(); log_entry_ptr& get_entry(index_t); public: - explicit log(snapshot snp, log_entries log = {}) - : _snapshot(std::move(snp)), _log(std::move(log)) { - if (_log.empty()) { - _first_idx = _snapshot.idx + index_t{1}; - } else { - _first_idx = _log[0]->idx; - // All log entries following the snapshot must - // be present, otherwise we will not be able to - // perform an initial state transfer. - assert(_first_idx <= _snapshot.idx + 1); - } - // The snapshot index is at least 0, so _first_idx - // is at least 1 - assert(_first_idx > 0); + explicit log(snapshot snp, log_entries log = {}) : _snapshot(std::move(snp)), _log(std::move(log)) { stable_to(last_idx()); init_last_conf_idx(); } @@ -109,9 +90,8 @@ public: // The voter denies its vote if its own log is more up-to-date // than that of the candidate. bool is_up_to_date(index_t idx, term_t term) const; + index_t start_idx() const; index_t next_idx() const; - // Return index of the last entry. If the log is empty, - // return the index of the last entry in the snapshot. index_t last_idx() const; index_t last_conf_idx() const { return _last_conf_idx; @@ -119,13 +99,9 @@ public: index_t stable_idx() const { return _stable_idx; } - // Return the term of the last entry in the log, - // or the snapshot term if the log is empty. - // Used in elections to not vote for a candidate with - // a less recent term. term_t last_term() const; - // Return the number of log entries in memory - size_t length() const { + // return the actual number of log entries in memory + size_t non_snapshoted_length() { return _log.size(); } @@ -162,18 +138,6 @@ public: // @retval first is false - the follower's log doesn't match the leader's // and non matching term is in second std::pair match_term(index_t idx, term_t term) const; - // Return term number of the entry matching the index. If the - // entry is not in the log and does not match snapshot index, - // return an empty optional. - // Used to validate the log matching rule. - std::optional term_for(index_t idx) const; - - // Return the latest configuration present in the log. - // This would be either the entry at last_conf_idx() - // or, if it's not set, the snapshot configuration. - // The returned reference is only valid until the next - // operation on the log. - const configuration& get_configuration() const; // Called on a follower to append entries from a leader. // @retval return an index of last appended entry diff --git a/raft/tracker.cc b/raft/progress.cc similarity index 87% rename from raft/tracker.cc rename to raft/progress.cc index 18f8974c95..27f61b0973 100644 --- a/raft/tracker.cc +++ b/raft/progress.cc @@ -18,7 +18,7 @@ * You should have received a copy of the GNU General Public License * along with Scylla. If not, see . */ -#include "tracker.hh" +#include "progress.hh" #include #include @@ -196,26 +196,29 @@ index_t tracker::committed(index_t prev_commit_idx) { } } -votes::votes(configuration configuration) - :_voters(configuration.current) - , _current(configuration.current) { - - if (configuration.is_joint()) { - _previous.emplace(configuration.previous); - _voters.insert(configuration.previous.begin(), configuration.previous.end()); +void votes::set_configuration(configuration configuration) { + _configuration = std::move(configuration); + _voters = _configuration.current; + if (_configuration.is_joint()) { + _voters.insert(_configuration.previous.begin(), _configuration.previous.end()); } } void votes::register_vote(server_id from, bool granted) { + server_address from_address{from}; bool registered = false; - if (_current.register_vote(from, granted)) { + if (_configuration.current.find(from_address) != _configuration.current.end()) { + _current.register_vote(granted); registered = true; } - if (_previous && _previous->register_vote(from, granted)) { + if (_configuration.is_joint() && + _configuration.previous.find(from_address) != _configuration.previous.end()) { + _previous.register_vote(granted); registered = true; } - // Should never receive a vote not requested, unless an RPC bug. + // Should never receive a vote not requested, unless an RPC + // bug. if (! registered) { seastar::on_internal_error(logger, format("Got a vote from unregistered server {} during election", from)); @@ -223,17 +226,17 @@ void votes::register_vote(server_id from, bool granted) { } vote_result votes::tally_votes() const { - if (_previous) { - auto previous_result = _previous->tally_votes(); + if (_configuration.is_joint()) { + auto previous_result = _previous.tally_votes(_configuration.previous.size()); if (previous_result != vote_result::WON) { return previous_result; } } - return _current.tally_votes(); + return _current.tally_votes(_configuration.current.size()); } std::ostream& operator<<(std::ostream& os, const election_tracker& v) { - os << "responded: " << v._responded.size() << ", "; + os << "responded: " << v._responded << ", "; os << "granted: " << v._granted; return os; } @@ -241,26 +244,7 @@ std::ostream& operator<<(std::ostream& os, const election_tracker& v) { std::ostream& operator<<(std::ostream& os, const votes& v) { os << "current: " << v._current << std::endl; - if (v._previous) { - os << "previous: " << v._previous.value() << std::endl; - } - return os; -} - -std::ostream& operator<<(std::ostream& os, const vote_result& v) { - static const char *n; - switch (v) { - case vote_result::UNKNOWN: - n = "UNKNOWN"; - break; - case vote_result::WON: - n = "WON"; - break; - case vote_result::LOST: - n = "LOST"; - break; - } - os << n; + os << "previous: " << v._previous << std::endl; return os; } diff --git a/raft/tracker.hh b/raft/progress.hh similarity index 78% rename from raft/tracker.hh rename to raft/progress.hh index 8a1e754fa9..105e292219 100644 --- a/raft/tracker.hh +++ b/raft/progress.hh @@ -30,7 +30,7 @@ namespace raft { class follower_progress { public: // Id of this server - const server_id id; + server_id id; // Index of the next log entry to send to this server. index_t next_idx; // Index of the highest log entry known to be replicated to this @@ -62,18 +62,6 @@ public: void become_pipeline(); void become_snapshot(); - void stable_to(index_t idx) { - // AppendEntries replies can arrive out of order. - if (idx > match_idx) { - match_idx = idx; - } - if (idx >= next_idx) { - // idx may be smaller if we increased next_idx - // optimistically in pipeline mode. - next_idx = idx + index_t{1}; - } - } - // Return true if a new replication record can be sent to the follower. bool can_send_to(); @@ -117,6 +105,8 @@ public: follower_progress* leader_progress() { return _leader_progress; } + const configuration& get_configuration() const { return _configuration; } + // Calculate the current commit index based on the current // simple or joint quorum. index_t committed(index_t prev_commit_idx); @@ -126,46 +116,31 @@ public: enum class vote_result { // We haven't got enough responses yet, either because // the servers haven't voted or responses failed to arrive. - UNKNOWN = 0, + UNKNOWN, // This candidate has won the election WON, // The quorum of servers has voted against this candidate LOST, }; -std::ostream& operator<<(std::ostream& os, const vote_result& v); - // State of election in a single quorum class election_tracker { - // All eligible voters - std::unordered_set _suffrage; - // Votes collected - std::unordered_set _responded; + size_t _responded = 0; size_t _granted = 0; public: - election_tracker(const server_address_set& configuration) { - for (const auto& a : configuration) { - _suffrage.emplace(a.id); + void register_vote(bool granted) { + _responded++; + if (granted) { + _granted++; } } - - bool register_vote(server_id from, bool granted) { - if (_suffrage.find(from) == _suffrage.end()) { - return false; - } - if (_responded.emplace(from).second) { - // Have not counted this vote yet - _granted += static_cast(granted); - } - return true; - } - vote_result tally_votes() const { - auto quorum = _suffrage.size() / 2 + 1; + vote_result tally_votes(size_t cluster_size) const { + auto quorum = cluster_size / 2 + 1; if (_granted >= quorum) { return vote_result::WON; } - assert(_responded.size() <= _suffrage.size()); - auto unknown = _suffrage.size() - _responded.size(); + assert(_responded <= cluster_size); + auto unknown = cluster_size - _responded; return _granted + unknown >= quorum ? vote_result::UNKNOWN : vote_result::LOST; } friend std::ostream& operator<<(std::ostream& os, const election_tracker& v); @@ -173,19 +148,23 @@ public: // Candidate's state specific to election class votes { + configuration _configuration; server_address_set _voters; election_tracker _current; - std::optional _previous; + election_tracker _previous; public: - votes(configuration configuration); - const server_address_set& voters() const { return _voters; } + void set_configuration(configuration configuration); void register_vote(server_id from, bool granted); vote_result tally_votes() const; + const configuration& get_configuration() const { + return _configuration; + } + friend std::ostream& operator<<(std::ostream& os, const votes& v); }; diff --git a/raft/server.cc b/raft/server.cc index 38ecb59e75..8e532cb512 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -183,7 +183,7 @@ server_impl::server_impl(server_id uuid, std::unique_ptr rpc, _id(uuid), _config(config) { set_rpc_server(_rpc.get()); if (_config.snapshot_threshold > _config.max_log_length) { - throw config_error("snapshot_threshold has to be smaller than max_log_length"); + throw config_error("snapshot_threshold has to be smaller than max_log_lengths"); } } @@ -219,8 +219,8 @@ template future<> server_impl::add_entry_internal(T command, wait_type type) { logger.trace("An entry is submitted on a leader"); - // Wait for a new slot to become available - co_await _fsm->wait_max_log_length(); + // wait for new slot to be available + co_await _fsm->wait(); logger.trace("An entry proceeds after wait"); @@ -598,7 +598,7 @@ void server_impl::register_metrics() { sm::make_total_operations("snapshots_taken", _stats.snapshots_taken, sm::description("how many time the user's state machine was snapshotted"), {server_id_label(_id)}), - sm::make_gauge("in_memory_log_size", [this] { return _fsm->log_length(); }, + sm::make_gauge("in_memory_log_size", [this] { return _fsm->in_memory_log_size(); }, sm::description("size of in-memory part of the log"), {server_id_label(_id)}), }); } diff --git a/raft/server.hh b/raft/server.hh index 2be5156c8d..2fbd1de234 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -39,12 +39,11 @@ public: size_t snapshot_trailing = 200; // max size of appended entries in bytes size_t append_request_threshold = 100000; - // Max number of entries of in-memory part of the log after - // which requests are stopped to be admitted until the log - // is shrunk back by a snapshot. Should be greater than - // whatever the default number of trailing log entries - // is configured by the snapshot, otherwise the state - // machine will deadlock on attempt to submit a new entry. + // max number of entries of in-memory part of the log after + // which requests are stopped to be addmitted unill the log + // is shrunk back by snapshoting. It has to be greater than + // snapshot_threshold otherwise submition of new entries will + // deadlock. size_t max_log_length = 5000; }; diff --git a/test/boost/raft_etcd_test.cc b/test/boost/raft_etcd_test.cc index 420742ec45..1095538149 100644 --- a/test/boost/raft_etcd_test.cc +++ b/test/boost/raft_etcd_test.cc @@ -78,7 +78,7 @@ raft::fsm_config fsm_cfg{.append_request_threshold = 1}; class fsm_debug : public raft::fsm { public: using raft::fsm::fsm; - const raft::follower_progress& get_progress(server_id id) { + raft::follower_progress& get_progress(server_id id) { raft::follower_progress* progress = _tracker->find(id); return *progress; } @@ -139,18 +139,18 @@ BOOST_AUTO_TEST_CASE(test_progress_resume_by_append_resp) { fsm.step(id2, raft::vote_reply{output.term, true}); BOOST_CHECK(fsm.is_leader()); - const raft::follower_progress& fprogress = fsm.get_progress(id2); + raft::follower_progress& fprogress = fsm.get_progress(id2); BOOST_CHECK(fprogress.state == raft::follower_progress::state::PROBE); - const raft::follower_progress& fprogress2 = fsm.get_progress(id2); - BOOST_CHECK(!fprogress2.probe_sent); + fprogress = fsm.get_progress(id2); + BOOST_CHECK(!fprogress.probe_sent); raft::command cmd = create_command(1); raft::log_entry le = fsm.add_entry(std::move(cmd)); do { output = fsm.get_output(); } while (output.messages.size() == 0); - BOOST_CHECK(fprogress2.probe_sent); + BOOST_CHECK(fprogress.probe_sent); } // TestProgressPaused @@ -200,7 +200,7 @@ BOOST_AUTO_TEST_CASE(test_progress_flow_control) { fsm.step(id2, raft::vote_reply{output.term, true}); // Throw away all the messages relating to the initial election. output = fsm.get_output(); - const raft::follower_progress& fprogress = fsm.get_progress(id2); + raft::follower_progress& fprogress = fsm.get_progress(id2); BOOST_CHECK(fprogress.state == raft::follower_progress::state::PROBE); // While node 2 is in probe state, propose a bunch of entries. @@ -228,8 +228,8 @@ BOOST_AUTO_TEST_CASE(test_progress_flow_control) { // When this append is acked, we change to replicate state and can // send multiple messages at once. (PIPELINE) fsm.step(id2, raft::append_reply{msg.current_term, le->idx, raft::append_reply::accepted{le->idx}}); - const raft::follower_progress& fprogress2 = fsm.get_progress(id2); - BOOST_CHECK(fprogress2.state == raft::follower_progress::state::PIPELINE); + fprogress = fsm.get_progress(id2); + BOOST_CHECK(fprogress.state == raft::follower_progress::state::PIPELINE); do { output = fsm.get_output(); @@ -493,7 +493,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_1) { for (auto& [id, msg] : output.messages) { BOOST_REQUIRE_NO_THROW(areq = std::get(msg)); BOOST_CHECK(areq.prev_log_idx == 0); - BOOST_CHECK(areq.prev_log_term == 0); + BOOST_CHECK(areq.prev_log_term == current_term); BOOST_CHECK(areq.entries.size() == 1); lep = areq.entries.back(); BOOST_CHECK(lep->idx == dummy_idx); diff --git a/test/boost/raft_fsm_test.cc b/test/boost/raft_fsm_test.cc index a7d4042286..bdae55ab3e 100644 --- a/test/boost/raft_fsm_test.cc +++ b/test/boost/raft_fsm_test.cc @@ -27,8 +27,6 @@ #include "raft/fsm.hh" using raft::term_t, raft::index_t, raft::server_id; -using raft::log_entry; -using seastar::make_lw_shared; void election_threshold(raft::fsm& fsm) { for (int i = 0; i <= raft::ELECTION_TIMEOUT.count(); i++) { @@ -49,233 +47,8 @@ struct failure_detector: public raft::failure_detector { } }; -template void add_entry(raft::log& log, T cmd) { - log.emplace_back(make_lw_shared(log_entry{log.last_term(), log.next_idx(), cmd})); -} - -raft::snapshot log_snapshot(raft::log& log, index_t idx) { - return raft::snapshot{.idx = idx, .term = log.last_term(), .config = log.get_snapshot().config}; -} - raft::fsm_config fsm_cfg{.append_request_threshold = 1}; -BOOST_AUTO_TEST_CASE(test_votes) { - auto id = []() -> raft::server_address { return raft::server_address{utils::make_random_uuid()}; }; - auto id1 = id(); - - raft::votes votes(raft::configuration({id1})); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN); - BOOST_CHECK_EQUAL(votes.voters().size(), 1); - // Try a vote from an unknown server, it should be ignored. - BOOST_CHECK_THROW(votes.register_vote(id().id, true), std::runtime_error); - votes.register_vote(id1.id, false); - // Quorum votes against the decision - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST); - // Another vote from the same server is ignored - votes.register_vote(id1.id, true); - votes.register_vote(id1.id, true); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST); - auto id2 = id(); - votes = raft::votes(raft::configuration({id1, id2})); - BOOST_CHECK_EQUAL(votes.voters().size(), 2); - votes.register_vote(id1.id, true); - // We need a quorum of participants to win an election - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN); - votes.register_vote(id2.id, false); - // At this point it's clear we don't have enough votes - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST); - auto id3 = id(); - // Joint configuration - votes = raft::votes(raft::configuration({id1}, {id2, id3})); - BOOST_CHECK_EQUAL(votes.voters().size(), 3); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN); - votes.register_vote(id2.id, true); - votes.register_vote(id3.id, true); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN); - votes.register_vote(id1.id, false); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST); - votes = raft::votes(raft::configuration({id1}, {id2, id3})); - votes.register_vote(id2.id, true); - votes.register_vote(id3.id, true); - votes.register_vote(id1.id, true); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::WON); - votes = raft::votes(raft::configuration({id1, id2, id3}, {id1})); - BOOST_CHECK_EQUAL(votes.voters().size(), 3); - votes.register_vote(id1.id, true); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN); - // This gives us a majority in both new and old - // configurations. - votes.register_vote(id2.id, true); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::WON); - // Basic voting test for 4 nodes - auto id4 = id(); - votes = raft::votes(raft::configuration({id1, id2, id3, id4})); - votes.register_vote(id1.id, true); - votes.register_vote(id2.id, true); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN); - votes.register_vote(id3.id, false); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::UNKNOWN); - votes.register_vote(id4.id, false); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST); - auto id5 = id(); - // Basic voting test for 5 nodes - votes = raft::votes(raft::configuration({id1, id2, id3, id4, id5}, {id1, id2, id3})); - votes.register_vote(id1.id, false); - votes.register_vote(id2.id, false); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST); - votes.register_vote(id3.id, true); - votes.register_vote(id4.id, true); - votes.register_vote(id5.id, true); - BOOST_CHECK_EQUAL(votes.tally_votes(), raft::vote_result::LOST); -} - -BOOST_AUTO_TEST_CASE(test_tracker) { - auto id = []() -> raft::server_address { return raft::server_address{utils::make_random_uuid()}; }; - auto id1 = id(); - raft::tracker tracker(id1.id); - raft::configuration cfg({id1}); - tracker.set_configuration(cfg, index_t{1}); - BOOST_CHECK_NE(tracker.find(id1.id), nullptr); - // The node with id set during construction is assumed to be - // the leader, since otherwise we wouldn't create a tracker - // in the first place. - BOOST_CHECK_EQUAL(tracker.find(id1.id), tracker.leader_progress()); - BOOST_CHECK_EQUAL(tracker.committed(index_t{0}), index_t{0}); - // Avoid keeping a reference, follower_progress address may - // change with configuration change - auto pr = [&tracker](raft::server_address address) -> raft::follower_progress* { - return tracker.find(address.id); - }; - BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{0}); - BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{1}); - - pr(id1)->stable_to(index_t{1}); - BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{1}); - BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{2}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{0}), index_t{1}); - - pr(id1)->stable_to(index_t{10}); - BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{10}); - BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{11}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{0}), index_t{10}); - - // Out of order confirmation is OK - // - pr(id1)->stable_to(index_t{5}); - BOOST_CHECK_EQUAL(pr(id1)->match_idx, index_t{10}); - BOOST_CHECK_EQUAL(pr(id1)->next_idx, index_t{11}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{5}), index_t{10}); - - // Enter joint configuration {A,B,C} - auto id2 = id(), id3 = id(); - cfg.enter_joint({id1, id2, id3}); - tracker.set_configuration(cfg, index_t{1}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{10}); - pr(id2)->stable_to(index_t{11}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{10}); - pr(id3)->stable_to(index_t{12}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{10}); - pr(id1)->stable_to(index_t{13}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{10}), index_t{12}); - pr(id1)->stable_to(index_t{14}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13}); - - // Leave joint configuration, final configuration is {A,B,C} - cfg.leave_joint(); - tracker.set_configuration(cfg, index_t{1}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13}); - - auto id4 = id(), id5 = id(); - cfg.enter_joint({id3, id4, id5}); - tracker.set_configuration(cfg, index_t{1}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13}); - pr(id1)->stable_to(index_t{15}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13}); - pr(id5)->stable_to(index_t{15}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{13}); - pr(id3)->stable_to(index_t{15}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{13}), index_t{15}); - // This does not advance the joint quorum - pr(id1)->stable_to(index_t{16}); - pr(id4)->stable_to(index_t{17}); - pr(id5)->stable_to(index_t{18}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{15}), index_t{15}); - - cfg.leave_joint(); - tracker.set_configuration(cfg, index_t{1}); - // Leaving joint configuration commits more entries - BOOST_CHECK_EQUAL(tracker.committed(index_t{15}), index_t{17}); - // - cfg.enter_joint({id1}); - cfg.leave_joint(); - cfg.enter_joint({id2}); - tracker.set_configuration(cfg, index_t{1}); - // Sic: we're in a weird state. The joint commit index - // is actually 1, since id2 is at position 1. But in - // unwinding back the commit index would be weird, - // so we report back the hint (prev_commit_idx). - // As soon as the cluster enters joint configuration, - // and old quorum is insufficient, the leader won't be able to - // commit new entries until the new members catch up. - BOOST_CHECK_EQUAL(tracker.committed(index_t{17}), index_t{17}); - pr(id1)->stable_to(index_t{18}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{17}), index_t{17}); - pr(id2)->stable_to(index_t{19}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{17}), index_t{18}); - pr(id1)->stable_to(index_t{20}); - BOOST_CHECK_EQUAL(tracker.committed(index_t{18}), index_t{19}); -} - -BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) { - // last_conf_idx, prev_conf_idx are initialized correctly, - // and maintained during truncate head/truncate tail - server_id id1{utils::make_random_uuid()}; - raft::configuration cfg({id1}); - raft::log log{raft::snapshot{.config = cfg}}; - BOOST_CHECK_EQUAL(log.last_conf_idx(), 0); - add_entry(log, cfg); - BOOST_CHECK_EQUAL(log.last_conf_idx(), 1); - add_entry(log, log_entry::dummy{}); - add_entry(log, cfg); - BOOST_CHECK_EQUAL(log.last_conf_idx(), 3); - // apply snapshot truncates the log and resets last_conf_idx() - log.apply_snapshot(log_snapshot(log, log.last_idx()), 0); - BOOST_CHECK_EQUAL(log.last_conf_idx(), 0); - // log::last_term() is maintained correctly by truncate_head/truncate_tail() (snapshotting) - BOOST_CHECK_EQUAL(log.last_term(), log.get_snapshot().term); - BOOST_CHECK(log.term_for(log.get_snapshot().idx).has_value()); - BOOST_CHECK_EQUAL(log.term_for(log.get_snapshot().idx).value(), log.get_snapshot().term); - BOOST_CHECK(! log.term_for(log.last_idx() - index_t{1}).has_value()); - add_entry(log, log_entry::dummy{}); - BOOST_CHECK(log.term_for(log.last_idx()).has_value()); - add_entry(log, log_entry::dummy{}); - const size_t GAP = 10; - // apply_snapshot with a log gap, this should clear all log - // entries, despite that trailing is given, a gap - // between old log entries and a snapshot would violate - // log continuity. - log.apply_snapshot(log_snapshot(log, log.last_idx() + index_t{GAP}), GAP * 2); - BOOST_CHECK(log.empty()); - BOOST_CHECK_EQUAL(log.next_idx(), log.get_snapshot().idx + index_t{1}); - add_entry(log, log_entry::dummy{}); - BOOST_CHECK_EQUAL(log.length(), 1); - add_entry(log, log_entry::dummy{}); - BOOST_CHECK_EQUAL(log.length(), 2); - // Set trailing longer than the length of the log. - log.apply_snapshot(log_snapshot(log, log.last_idx()), 3); - BOOST_CHECK_EQUAL(log.length(), 2); - // Set trailing the same length as the current log length - add_entry(log, log_entry::dummy{}); - BOOST_CHECK_EQUAL(log.length(), 3); - log.apply_snapshot(log_snapshot(log, log.last_idx()), 3); - BOOST_CHECK_EQUAL(log.length(), 3); - BOOST_CHECK_EQUAL(log.last_conf_idx(), 0); - add_entry(log, log_entry::dummy{}); - // Set trailing shorter than the length of the log - log.apply_snapshot(log_snapshot(log, log.last_idx()), 1); - BOOST_CHECK_EQUAL(log.length(), 1); -} - BOOST_AUTO_TEST_CASE(test_election_single_node) { failure_detector fd; @@ -334,10 +107,6 @@ BOOST_AUTO_TEST_CASE(test_single_node_is_quiet) { fsm.add_entry(raft::command{}); BOOST_CHECK(fsm.get_output().messages.empty()); - - fsm.tick(); - - BOOST_CHECK(fsm.get_output().messages.empty()); } BOOST_AUTO_TEST_CASE(test_election_two_nodes) {