From df944f953ce63255dcbcff7e80656bf01758b9bd Mon Sep 17 00:00:00 2001 From: Konstantin Osipov Date: Wed, 14 Oct 2020 14:58:58 +0300 Subject: [PATCH] raft: joint consensus, switch configuration to joint In order to work correctly in transitional configuration, participants must enter it after crashes, restarts and state changes. This means it must be stored in Raft log and snapshot on the leader and followers. This is most easily done if transitional configuration is just a flavour of standard configuration. In FSM, rename _current_config to _configuration, it now contains both current and future configuration at all times. --- raft/fsm.cc | 10 +++++----- raft/fsm.hh | 19 ++++++++++--------- raft/raft.hh | 11 ++++++++--- test/raft/replication_test.cc | 4 ++-- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/raft/fsm.cc b/raft/fsm.cc index a98072acc5..342fe55a7a 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -96,7 +96,7 @@ void fsm::become_leader() { _tracker.emplace(_my_id); _log_limiter_semaphore.emplace(this); _log_limiter_semaphore->sem.consume(_log.non_snapshoted_length()); - _tracker->set_configuration(_current_config.servers, _log.next_idx()); + _tracker->set_configuration(_configuration.current, _log.next_idx()); _last_election_time = _clock.now(); // a new leader needs to commit at lease one entry to make sure that // all existing entries in its log are commited as well. Also it should @@ -131,7 +131,7 @@ void fsm::become_candidate() { // and initiating another round of RequestVote RPCs. _last_election_time = _clock.now(); _votes.emplace(); - _votes->set_configuration(_current_config.servers); + _votes->set_configuration(_configuration.current); _voted_for = _my_id; if (_votes->tally_votes() == vote_result::WON) { @@ -140,7 +140,7 @@ void fsm::become_candidate() { return; } - for (const auto& server : _current_config.servers) { + for (const auto& server : _configuration.current) { if (server.id == _my_id) { continue; } @@ -498,7 +498,7 @@ static size_t entry_size(const log_entry& e) { } size_t operator()(const configuration& c) { size_t size = 0; - for (auto& s : c.servers) { + for (auto& s : c.current) { size += sizeof(s.id); size += s.info.size(); } @@ -684,7 +684,7 @@ std::ostream& operator<<(std::ostream& os, const fsm& f) { } os << "messages: " << f._messages.size() << ", "; os << "current_config ("; - for (auto& server: f._current_config.servers) { + for (auto& server: f._configuration.current) { os << server.id << ", "; } os << "), "; diff --git a/raft/fsm.hh b/raft/fsm.hh index 74b524d85d..3ccc072fc2 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -176,9 +176,8 @@ private: // TLA+ line 328 std::vector> _messages; - // Currently used configuration, may be different from - // the committed during a configuration change. - configuration _current_config; + // Transitional (joint) or committed configuration. + configuration _configuration; // Signaled when there is a IO event to process. seastar::condition_variable _sm_events; @@ -250,16 +249,18 @@ private: // Tick implementation on a leader void tick_leader(); - // Set cluster configuration + // Reconfigure this instance to use the provided configuration. + // Called on start, configuration change, or when restoring + // to the previous configuration. void set_configuration(const configuration& config) { - _current_config = config; - // We unconditionally access _current_config + _configuration = config; + // We unconditionally access _configuration // to identify which entries are committed. - assert(_current_config.servers.size() > 0); + assert(_configuration.current.size() > 0); if (is_leader()) { - _tracker->set_configuration(_current_config.servers, _log.next_idx()); + _tracker->set_configuration(_configuration.current, _log.next_idx()); } else if (is_candidate()) { - _votes->set_configuration(_current_config.servers); + _votes->set_configuration(_configuration.current); } } public: diff --git a/raft/raft.hh b/raft/raft.hh index 4b0fed1573..3e51255d4e 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -67,12 +67,17 @@ struct server_address { }; struct configuration { - std::vector servers; + // Used during the transitioning period of configuration + // changes. + std::vector previous; + // Contains the current configuration. When configuration + // change is in progress, contains the new configuration. + std::vector current; configuration(std::initializer_list ids) { - servers.reserve(ids.size()); + current.reserve(ids.size()); for (auto&& id : ids) { - servers.emplace_back(server_address{std::move(id)}); + current.emplace_back(server_address{std::move(id)}); } } configuration() = default; diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index bf2d5c0a95..86d049eeed 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -324,11 +324,11 @@ future, state_machine*>>> cr for (size_t i = 0; i < states.size(); i++) { auto uuid = utils::UUID(0, i + 1); // Custom sequential debug id; 0 is invalid - config.servers.push_back(raft::server_address{uuid}); + config.current.push_back(raft::server_address{uuid}); } for (size_t i = 0; i < states.size(); i++) { - auto& s = config.servers[i]; + auto& s = config.current[i]; states[i].snapshot.config = config; snapshots[s.id] = states[i].snp_value; auto& raft = *rafts.emplace_back(create_raft_server(s.id, apply, states[i], apply_entries, type)).first;