diff --git a/raft/progress.cc b/raft/progress.cc index b5c831f2ca..ddf13a97f0 100644 --- a/raft/progress.cc +++ b/raft/progress.cc @@ -83,44 +83,116 @@ bool follower_progress::can_send_to() { return false; } +// If this is called when a tracker is just created, the current +// progress is empty and we should simply crate an instance for +// each follower. +// When switching configurations, we should preserve progress +// for existing followers, crate progress for new, and remove +// progress for non-members (to make sure we don't send noise +// messages to them). void tracker::set_configuration(configuration configuration, index_t next_idx) { _configuration = std::move(configuration); - for (const auto& s : _configuration.current) { - if (this->progress::find(s.id) != this->progress::end()) { - continue; + _leader_progress = nullptr; + // Swap out the current progress and then re-add + // only those entries which are still present. + progress old_progress = std::move(*this); + + auto emplace_simple_config = [&](const server_address_set& config) { + for (const auto& s : config) { + auto newp = this->progress::find(s.id); + if (newp != this->progress::end()) { + // Processing joint configuration and already added + // an entry for this id. + continue; + } + auto oldp = old_progress.find(s.id); + if (oldp != old_progress.end()) { + newp = this->progress::emplace(s.id, std::move(oldp->second)).first; + } else { + newp = this->progress::emplace(s.id, follower_progress{s.id, next_idx}).first; + } + if (s.id == _my_id) { + // The leader is part of the current + // configuration. + _leader_progress = &newp->second; + } } - this->progress::emplace(s.id, follower_progress{s.id, next_idx}); + }; + emplace_simple_config(_configuration.current); + if (_configuration.is_joint()) { + emplace_simple_config(_configuration.previous); } } -index_t tracker::committed(index_t prev_commit_idx) { - std::vector match; - size_t count = 0; +// A sorted array of node match indexes used to find +// the pivot which serves as commit index of the group. +class match_vector { + std::vector _match; + // How many elements in the match array have a match index + // larger than the previous commit index. + size_t _count = 0; + index_t _prev_commit_idx; +public: + explicit match_vector(index_t prev_commit_idx, size_t reserve_size) + : _prev_commit_idx(prev_commit_idx) { + _match.reserve(reserve_size); + } - for (const auto& [id, p] : *this) { - logger.trace("committed {}: {} {}", p.id, p.match_idx, prev_commit_idx); - if (p.match_idx > prev_commit_idx) { - count++; + void push_back(index_t match_idx) { + if (match_idx > _prev_commit_idx) { + _count++; } - match.push_back(p.match_idx); + _match.push_back(match_idx); } - logger.trace("check committed count {} cluster size {}", count, match.size()); - if (count < match.size()/2 + 1) { - return prev_commit_idx; + bool committed() const { + return _count >= _match.size()/2 + 1; + } + index_t commit_idx() { + logger.trace("check committed count {} cluster size {}", _count, _match.size()); + // The index of the pivot node is selected so that all nodes + // with a larger match index plus the pivot form a majority, + // for example: + // cluster size pivot node majority + // 1 0 1 + // 2 0 2 + // 3 1 2 + // 4 1 3 + // 5 2 3 + // + auto pivot = (_match.size() - 1) / 2; + std::nth_element(_match.begin(), _match.begin() + pivot, _match.end()); + return _match[pivot]; + } +}; + +index_t tracker::committed(index_t prev_commit_idx) { + + match_vector current(prev_commit_idx, _configuration.current.size()); + + if (_configuration.is_joint()) { + match_vector previous(prev_commit_idx, _configuration.previous.size()); + + for (const auto& [id, p] : *this) { + if (_configuration.current.find(server_address{p.id}) != _configuration.current.end()) { + current.push_back(p.match_idx); + } + if (_configuration.previous.find(server_address{p.id}) != _configuration.previous.end()) { + previous.push_back(p.match_idx); + } + } + if (!current.committed() || !previous.committed()) { + return prev_commit_idx; + } + return std::min(current.commit_idx(), previous.commit_idx()); + } else { + for (const auto& [id, p] : *this) { + current.push_back(p.match_idx); + } + if (!current.committed()) { + return prev_commit_idx; + } + return current.commit_idx(); } - // The index of the pivot node is selected so that all nodes - // with a larger match index plus the pivot form a majority, - // for example: - // cluster size pivot node majority - // 1 0 1 - // 2 0 2 - // 3 1 2 - // 4 1 3 - // 5 2 3 - // - auto pivot = (match.size() - 1) / 2; - std::nth_element(match.begin(), match.begin() + pivot, match.end()); - return match[pivot]; } std::ostream& operator<<(std::ostream& os, const votes& v) { diff --git a/raft/progress.hh b/raft/progress.hh index 7304bf7537..e5790c31b7 100644 --- a/raft/progress.hh +++ b/raft/progress.hh @@ -76,6 +76,14 @@ class tracker: private progress { // Copy of this server's id server_id _my_id; configuration _configuration; + // Not NULL if the leader is part of the current configuration. + // + // 4.2.2 Removing the current leader + // There will be a period of time (while it is committing + // C_new) when a leader can manage a cluster that does not + // include itself; it replicates log entries but does not + // count itself in majorities. + follower_progress *_leader_progress = nullptr; public: using progress::begin, progress::end, progress::cbegin, progress::cend, progress::size; @@ -88,6 +96,11 @@ public: return this->progress::find(dst)->second; } void set_configuration(configuration configuration, index_t next_idx); + // Return progress object for the current leader if it's + // part of the current configuration. + follower_progress* leader_progress() { + return _leader_progress; + } const configuration& get_configuration() const { return _configuration; } // Calculate the current commit index based on the current