raft: joint consensus, update progress tracker with joint configuration

The leader doesn't have to be part of the current
configuration, so add a way to access follower_progress for the leader
only if it is present.

Upon configuration changes, preserve progress information
for intact nodes, remove for removed, and create a new progress
object for added nodes.

When tracking commit progress in joint configuration mode,
calculate two commit indexes for two configurations, and
choose the smallest one.
This commit is contained in:
Konstantin Osipov
2020-10-16 12:43:41 +03:00
parent 20df1955b2
commit 18a684ba11
2 changed files with 113 additions and 28 deletions

View File

@@ -83,44 +83,116 @@ bool follower_progress::can_send_to() {
return false;
}
// If this is called when a tracker is just created, the current
// progress is empty and we should simply crate an instance for
// each follower.
// When switching configurations, we should preserve progress
// for existing followers, crate progress for new, and remove
// progress for non-members (to make sure we don't send noise
// messages to them).
void tracker::set_configuration(configuration configuration, index_t next_idx) {
_configuration = std::move(configuration);
for (const auto& s : _configuration.current) {
if (this->progress::find(s.id) != this->progress::end()) {
continue;
_leader_progress = nullptr;
// Swap out the current progress and then re-add
// only those entries which are still present.
progress old_progress = std::move(*this);
auto emplace_simple_config = [&](const server_address_set& config) {
for (const auto& s : config) {
auto newp = this->progress::find(s.id);
if (newp != this->progress::end()) {
// Processing joint configuration and already added
// an entry for this id.
continue;
}
auto oldp = old_progress.find(s.id);
if (oldp != old_progress.end()) {
newp = this->progress::emplace(s.id, std::move(oldp->second)).first;
} else {
newp = this->progress::emplace(s.id, follower_progress{s.id, next_idx}).first;
}
if (s.id == _my_id) {
// The leader is part of the current
// configuration.
_leader_progress = &newp->second;
}
}
this->progress::emplace(s.id, follower_progress{s.id, next_idx});
};
emplace_simple_config(_configuration.current);
if (_configuration.is_joint()) {
emplace_simple_config(_configuration.previous);
}
}
index_t tracker::committed(index_t prev_commit_idx) {
std::vector<index_t> match;
size_t count = 0;
// A sorted array of node match indexes used to find
// the pivot which serves as commit index of the group.
class match_vector {
std::vector<index_t> _match;
// How many elements in the match array have a match index
// larger than the previous commit index.
size_t _count = 0;
index_t _prev_commit_idx;
public:
explicit match_vector(index_t prev_commit_idx, size_t reserve_size)
: _prev_commit_idx(prev_commit_idx) {
_match.reserve(reserve_size);
}
for (const auto& [id, p] : *this) {
logger.trace("committed {}: {} {}", p.id, p.match_idx, prev_commit_idx);
if (p.match_idx > prev_commit_idx) {
count++;
void push_back(index_t match_idx) {
if (match_idx > _prev_commit_idx) {
_count++;
}
match.push_back(p.match_idx);
_match.push_back(match_idx);
}
logger.trace("check committed count {} cluster size {}", count, match.size());
if (count < match.size()/2 + 1) {
return prev_commit_idx;
bool committed() const {
return _count >= _match.size()/2 + 1;
}
index_t commit_idx() {
logger.trace("check committed count {} cluster size {}", _count, _match.size());
// The index of the pivot node is selected so that all nodes
// with a larger match index plus the pivot form a majority,
// for example:
// cluster size pivot node majority
// 1 0 1
// 2 0 2
// 3 1 2
// 4 1 3
// 5 2 3
//
auto pivot = (_match.size() - 1) / 2;
std::nth_element(_match.begin(), _match.begin() + pivot, _match.end());
return _match[pivot];
}
};
index_t tracker::committed(index_t prev_commit_idx) {
match_vector current(prev_commit_idx, _configuration.current.size());
if (_configuration.is_joint()) {
match_vector previous(prev_commit_idx, _configuration.previous.size());
for (const auto& [id, p] : *this) {
if (_configuration.current.find(server_address{p.id}) != _configuration.current.end()) {
current.push_back(p.match_idx);
}
if (_configuration.previous.find(server_address{p.id}) != _configuration.previous.end()) {
previous.push_back(p.match_idx);
}
}
if (!current.committed() || !previous.committed()) {
return prev_commit_idx;
}
return std::min(current.commit_idx(), previous.commit_idx());
} else {
for (const auto& [id, p] : *this) {
current.push_back(p.match_idx);
}
if (!current.committed()) {
return prev_commit_idx;
}
return current.commit_idx();
}
// The index of the pivot node is selected so that all nodes
// with a larger match index plus the pivot form a majority,
// for example:
// cluster size pivot node majority
// 1 0 1
// 2 0 2
// 3 1 2
// 4 1 3
// 5 2 3
//
auto pivot = (match.size() - 1) / 2;
std::nth_element(match.begin(), match.begin() + pivot, match.end());
return match[pivot];
}
std::ostream& operator<<(std::ostream& os, const votes& v) {

View File

@@ -76,6 +76,14 @@ class tracker: private progress {
// Copy of this server's id
server_id _my_id;
configuration _configuration;
// Not NULL if the leader is part of the current configuration.
//
// 4.2.2 Removing the current leader
// There will be a period of time (while it is committing
// C_new) when a leader can manage a cluster that does not
// include itself; it replicates log entries but does not
// count itself in majorities.
follower_progress *_leader_progress = nullptr;
public:
using progress::begin, progress::end, progress::cbegin, progress::cend, progress::size;
@@ -88,6 +96,11 @@ public:
return this->progress::find(dst)->second;
}
void set_configuration(configuration configuration, index_t next_idx);
// Return progress object for the current leader if it's
// part of the current configuration.
follower_progress* leader_progress() {
return _leader_progress;
}
const configuration& get_configuration() const { return _configuration; }
// Calculate the current commit index based on the current