raft: joint consensus, switch configuration to joint

In order to work correctly in transitional configuration,
participants must enter it after crashes, restarts and
state changes.

This means it must be stored in Raft log and snapshot
on the leader and followers.

This is most easily done if transitional configuration
is just a flavour of standard configuration.

In FSM, rename _current_config to _configuration,
it now contains both current and future configuration
at all times.
This commit is contained in:
Konstantin Osipov
2020-10-14 14:58:58 +03:00
parent 076e46af9e
commit df944f953c
4 changed files with 25 additions and 19 deletions

View File

@@ -96,7 +96,7 @@ void fsm::become_leader() {
_tracker.emplace(_my_id);
_log_limiter_semaphore.emplace(this);
_log_limiter_semaphore->sem.consume(_log.non_snapshoted_length());
_tracker->set_configuration(_current_config.servers, _log.next_idx());
_tracker->set_configuration(_configuration.current, _log.next_idx());
_last_election_time = _clock.now();
// a new leader needs to commit at lease one entry to make sure that
// all existing entries in its log are commited as well. Also it should
@@ -131,7 +131,7 @@ void fsm::become_candidate() {
// and initiating another round of RequestVote RPCs.
_last_election_time = _clock.now();
_votes.emplace();
_votes->set_configuration(_current_config.servers);
_votes->set_configuration(_configuration.current);
_voted_for = _my_id;
if (_votes->tally_votes() == vote_result::WON) {
@@ -140,7 +140,7 @@ void fsm::become_candidate() {
return;
}
for (const auto& server : _current_config.servers) {
for (const auto& server : _configuration.current) {
if (server.id == _my_id) {
continue;
}
@@ -498,7 +498,7 @@ static size_t entry_size(const log_entry& e) {
}
size_t operator()(const configuration& c) {
size_t size = 0;
for (auto& s : c.servers) {
for (auto& s : c.current) {
size += sizeof(s.id);
size += s.info.size();
}
@@ -684,7 +684,7 @@ std::ostream& operator<<(std::ostream& os, const fsm& f) {
}
os << "messages: " << f._messages.size() << ", ";
os << "current_config (";
for (auto& server: f._current_config.servers) {
for (auto& server: f._configuration.current) {
os << server.id << ", ";
}
os << "), ";

View File

@@ -176,9 +176,8 @@ private:
// TLA+ line 328
std::vector<std::pair<server_id, rpc_message>> _messages;
// Currently used configuration, may be different from
// the committed during a configuration change.
configuration _current_config;
// Transitional (joint) or committed configuration.
configuration _configuration;
// Signaled when there is a IO event to process.
seastar::condition_variable _sm_events;
@@ -250,16 +249,18 @@ private:
// Tick implementation on a leader
void tick_leader();
// Set cluster configuration
// Reconfigure this instance to use the provided configuration.
// Called on start, configuration change, or when restoring
// to the previous configuration.
void set_configuration(const configuration& config) {
_current_config = config;
// We unconditionally access _current_config
_configuration = config;
// We unconditionally access _configuration
// to identify which entries are committed.
assert(_current_config.servers.size() > 0);
assert(_configuration.current.size() > 0);
if (is_leader()) {
_tracker->set_configuration(_current_config.servers, _log.next_idx());
_tracker->set_configuration(_configuration.current, _log.next_idx());
} else if (is_candidate()) {
_votes->set_configuration(_current_config.servers);
_votes->set_configuration(_configuration.current);
}
}
public:

View File

@@ -67,12 +67,17 @@ struct server_address {
};
struct configuration {
std::vector<server_address> servers;
// Used during the transitioning period of configuration
// changes.
std::vector<server_address> previous;
// Contains the current configuration. When configuration
// change is in progress, contains the new configuration.
std::vector<server_address> current;
configuration(std::initializer_list<server_id> ids) {
servers.reserve(ids.size());
current.reserve(ids.size());
for (auto&& id : ids) {
servers.emplace_back(server_address{std::move(id)});
current.emplace_back(server_address{std::move(id)});
}
}
configuration() = default;

View File

@@ -324,11 +324,11 @@ future<std::vector<std::pair<std::unique_ptr<raft::server>, state_machine*>>> cr
for (size_t i = 0; i < states.size(); i++) {
auto uuid = utils::UUID(0, i + 1); // Custom sequential debug id; 0 is invalid
config.servers.push_back(raft::server_address{uuid});
config.current.push_back(raft::server_address{uuid});
}
for (size_t i = 0; i < states.size(); i++) {
auto& s = config.servers[i];
auto& s = config.current[i];
states[i].snapshot.config = config;
snapshots[s.id] = states[i].snp_value;
auto& raft = *rafts.emplace_back(create_raft_server(s.id, apply, states[i], apply_entries, type)).first;