diff --git a/configure.py b/configure.py index 44e3b5b23d..10839a1d05 100755 --- a/configure.py +++ b/configure.py @@ -433,13 +433,18 @@ perf_tests = set([ 'test/perf/perf_big_decimal', ]) +raft_tests = set([ + 'test/raft/replication_test', + 'test/boost/raft_fsm_test', +]) + apps = set([ 'scylla', 'test/tools/cql_repl', 'tools/scylla-types', ]) -tests = scylla_tests | perf_tests +tests = scylla_tests | perf_tests | raft_tests other = set([ 'iotune', @@ -498,6 +503,8 @@ arg_parser.add_argument('--with-antlr3', dest='antlr3_exec', action='store', def help='path to antlr3 executable') arg_parser.add_argument('--with-ragel', dest='ragel_exec', action='store', default='ragel', help='path to ragel executable') +arg_parser.add_argument('--build-raft', dest='build_raft', action='store_true', default=False, + help='build raft code') add_tristate(arg_parser, name='stack-guards', dest='stack_guards', help='Use stack guards') arg_parser.add_argument('--verbose', dest='verbose', action='store_true', help='Make configure.py output more verbose (useful for debugging the build process itself)') @@ -506,6 +513,21 @@ arg_parser.add_argument('--test-repeat', dest='test_repeat', action='store', typ arg_parser.add_argument('--test-timeout', dest='test_timeout', action='store', type=str, default='7200') args = arg_parser.parse_args() +coroutines_test_src = ''' +#define GCC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) +#if GCC_VERSION < 100201 + #error "Coroutines support requires at leat gcc 10.2.1" +#endif +''' +compiler_supports_coroutines = try_compile(compiler=args.cxx, source=coroutines_test_src) + +if args.build_raft and not compiler_supports_coroutines: + raise Exception("--build-raft is requested, while the used compiler does not support coroutines") + +if not args.build_raft: + all_artifacts.difference_update(raft_tests) + tests.difference_update(raft_tests) + defines = ['XXH_PRIVATE_API', 'SEASTAR_TESTING_MAIN', ] @@ -943,6 +965,15 @@ scylla_tests_dependencies = scylla_core + idls + scylla_tests_generic_dependenci 'test/lib/random_schema.cc', ] +scylla_raft_dependencies = [ + 'raft/raft.cc', + 'raft/server.cc', + 'raft/fsm.cc', + 'raft/progress.cc', + 'raft/log.cc', + 'utils/uuid.cc' +] + deps = { 'scylla': idls + ['main.cc', 'release.cc', 'utils/build_id.cc'] + scylla_core + api + alternator + redis, 'test/tools/cql_repl': idls + ['test/tools/cql_repl.cc'] + scylla_core + scylla_tests_generic_dependencies, @@ -1063,8 +1094,12 @@ deps['test/boost/linearizing_input_stream_test'] = [ deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc'] deps['test/boost/alternator_base64_test'] += ['alternator/base64.cc'] +deps['test/raft/replication_test'] = ['test/raft/replication_test.cc'] + scylla_raft_dependencies +deps['test/boost/raft_fsm_test'] = ['test/boost/raft_fsm_test.cc', 'test/lib/log.cc'] + scylla_raft_dependencies + deps['utils/gz/gen_crc_combine_table'] = ['utils/gz/gen_crc_combine_table.cc'] + warnings = [ '-Wall', '-Werror', @@ -1240,6 +1275,10 @@ args.user_cflags += ' -Wno-error=stack-usage=' args.user_cflags += f"-ffile-prefix-map={curdir}=." seastar_cflags = args.user_cflags + +if build_raft: + seastar_cflags += ' -fcoroutines' + if args.target != '': seastar_cflags += ' -march=' + args.target seastar_ldflags = args.user_ldflags @@ -1368,6 +1407,9 @@ libs = ' '.join([maybe_static(args.staticyamlcpp, '-lyaml-cpp'), '-latomic', '-l if not args.staticboost: args.user_cflags += ' -DBOOST_TEST_DYN_LINK' +if build_raft: + args.user_cflags += ' -DENABLE_SCYLLA_RAFT -fcoroutines' + # thrift version detection, see #4538 proc_res = subprocess.run(["thrift", "-version"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) proc_res_output = proc_res.stdout.decode("utf-8") diff --git a/raft/README.md b/raft/README.md new file mode 100644 index 0000000000..b3184e8be7 --- /dev/null +++ b/raft/README.md @@ -0,0 +1,71 @@ +# Raft consensus algorithm implementation for Seastar + +Seastar is a high performance server-side application framework +written in C++. Please read more about Seastar at http://seastar.io/ + +This library provides an efficient, extensible, implementation of +Raft consensus algorithm for Seastar. +For more details about Raft see https://raft.github.io/ + +## Implementation status +--------------------- +- log replication, including throttling for unresponsive + servers +- leader election + +## Usage +----- + +In order to use the library the application has to provide implementations +for RPC, storage and state machine APIs, defined in raft/raft.hh. The +purpose of these interfaces is: +- provide a way to communicate between Raft protocol instances +- persist the required protocol state on disk, +a pre-requisite of the protocol correctness, +- apply committed entries to the state machine. + +While comments for these classes provide an insight into +expected guarantees they should provide, in order to provide a complying +implementation it's necessary to understand the expectations +of the Raft consistency protocol on its environment: +- RPC should implement a model of asynchronous, unreliable network, + in which messages can be lost, reordered, retransmitted more than + once, but not corrupted. Specifically, it's an error to + deliver a message to a Raft server which was not sent to it. +- storage should provide a durable persistent storage, which + survives between state machine restarts and does not corrupt + its state. +- Raft library calls `state_machine::apply_entry()` for entries + reliably committed to the replication log on the majority of + servers. While `apply_entry()` is called in the order + entries are serialized in the distributed log, there is + no guarantee that `apply_entry()` is called exactly once. + E.g. when a protocol instance restart from persistent state, + it may re-apply some already applied log entries. + +Seastar's execution model is that every object is safe to use +within a given shard (physical OS thread). Raft library follows +the same pattern. Calls to Raft API are safe when they are local +to a single shard. Moving instances of the library between shards +is not supported. + +### First usage. + +For an example of first usage see `replication_test.cc` in test/raft/. + +In a nutshell: +- create instances of RPC, storage, and state machine +- pass them to an instance of Raft server - the facade to the Raft cluster + on this node +- repeat the above for every node in the cluster +- use `server::add_entry()` to submit new entries + on a leader, `state_machine::apply_entries()` is called after the added + entry is committed by the cluster. + +### Subsequent usages + +Similar to the first usage, but `storage::load_term_and_vote()` +`storage::load_log()`, `storage::load_snapshot()` are expected to +return valid protocol state as persisted by the previous incarnation +of an instance of class server. + diff --git a/raft/fsm.cc b/raft/fsm.cc new file mode 100644 index 0000000000..84b4e4b742 --- /dev/null +++ b/raft/fsm.cc @@ -0,0 +1,554 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "fsm.hh" +#include +#include + +namespace raft { + +fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, + failure_detector& failure_detector) : + _my_id(id), _current_term(current_term), _voted_for(voted_for), + _log(std::move(log)), _failure_detector(failure_detector) { + + _observed.advance(*this); + set_configuration(_log.get_snapshot().config); + logger.trace("{}: starting log length {}", _my_id, _log.last_idx()); + + assert(!bool(_current_leader)); +} + +template +const log_entry& fsm::add_entry(T command) { + // It's only possible to add entries on a leader. + check_is_leader(); + + _log.emplace_back(log_entry{_current_term, _log.next_idx(), std::move(command)}); + _sm_events.signal(); + + return *_log[_log.last_idx()]; +} + +template const log_entry& fsm::add_entry(command command); +template const log_entry& fsm::add_entry(log_entry::dummy dummy); + +void fsm::advance_commit_idx(index_t leader_commit_idx) { + + auto new_commit_idx = std::min(leader_commit_idx, _log.stable_idx()); + + logger.trace("advance_commit_idx[{}]: leader_commit_idx={}, new_commit_idx={}", + _my_id, leader_commit_idx, new_commit_idx); + + if (new_commit_idx > _commit_idx) { + _commit_idx = new_commit_idx; + _sm_events.signal(); + logger.trace("advance_commit_idx[{}]: signal apply_entries: committed: {}", + _my_id, _commit_idx); + } +} + + +void fsm::update_current_term(term_t current_term) +{ + assert(_current_term < current_term); + _current_term = current_term; + _voted_for = server_id{}; + + static thread_local std::default_random_engine re{std::random_device{}()}; + static thread_local std::uniform_int_distribution<> dist(1, ELECTION_TIMEOUT.count()); + // Reset the randomized election timeout on each term + // change, even if we do not plan to campaign during this + // term: the main purpose of the timeout is to avoid + // starting our campaign simultaneously with other followers. + _randomized_election_timeout = ELECTION_TIMEOUT + logical_clock::duration{dist(re)}; +} + +void fsm::become_leader() { + assert(!std::holds_alternative(_state)); + assert(!_tracker); + _state = leader{}; + _current_leader = _my_id; + _votes = std::nullopt; + _tracker.emplace(_my_id); + _tracker->set_configuration(_current_config.servers, _log.next_idx()); + _last_election_time = _clock.now(); + replicate(); +} + +void fsm::become_follower(server_id leader) { + _current_leader = leader; + _state = follower{}; + _tracker = std::nullopt; + _votes = std::nullopt; + if (_current_leader) { + _last_election_time = _clock.now(); + } +} + +void fsm::become_candidate() { + _state = candidate{}; + _tracker = std::nullopt; + update_current_term(term_t{_current_term + 1}); + // 3.4 Leader election + // A possible outcome is that a candidate neither wins nor + // loses the election: if many followers become candidates at + // the same time, votes could be split so that no candidate + // obtains a majority. When this happens, each candidate will + // time out and start a new election by incrementing its term + // and initiating another round of RequestVote RPCs. + _last_election_time = _clock.now(); + _votes.emplace(); + _votes->set_configuration(_current_config.servers); + _voted_for = _my_id; + + if (_votes->tally_votes() == vote_result::WON) { + // A single node cluster. + become_leader(); + return; + } + + for (const auto& server : _current_config.servers) { + if (server.id == _my_id) { + continue; + } + logger.trace("{} [term: {}, index: {}, last log term: {}] sent vote request to {}", + _my_id, _current_term, _log.last_idx(), _log.last_term(), server.id); + + send_to(server.id, vote_request{_current_term, _log.last_idx(), _log.last_term()}); + } +} + +future fsm::poll_output() { + logger.trace("fsm::poll_output() {} stable index: {} last index: {}", + _my_id, _log.stable_idx(), _log.last_idx()); + + while (true) { + auto diff = _log.last_idx() - _log.stable_idx(); + + if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this)) { + break; + } + co_await _sm_events.wait(); + } + co_return get_output(); +} + +fsm_output fsm::get_output() { + fsm_output output; + + auto diff = _log.last_idx() - _log.stable_idx(); + + if (diff > 0) { + output.log_entries.reserve(diff); + + for (auto i = _log.stable_idx() + 1; i <= _log.last_idx(); i++) { + // Copy before saving to storage to prevent races with log updates, + // e.g. truncation of the log. + // TODO: avoid copies by making sure log truncate is + // copy-on-write. + output.log_entries.emplace_back(_log[i]); + } + } + + if (_observed._current_term != _current_term || _observed._voted_for != _voted_for) { + output.term = _current_term; + output.vote = _voted_for; + } + + // Return committed entries. + // Observer commit index may be smaller than snapshot index + // in which case we should not attemp commiting entries belonging + // to a snapshot. + auto observed_ci = std::max(_observed._commit_idx, _log.get_snapshot().idx); + if (observed_ci < _commit_idx) { + output.committed.reserve(_commit_idx - observed_ci); + + for (auto idx = observed_ci + 1; idx <= _commit_idx; ++idx) { + const auto& entry = _log[idx]; + if (!std::holds_alternative(entry->data)) { + output.committed.push_back(entry); + } + } + } + + // Get a snapshot of all unsent messages. + // Do it after populting log_entries and committed arrays + // to not lose messages in case arrays population throws + std::swap(output.messages, _messages); + + // Advance the observed state. + _observed.advance(*this); + + // Be careful to do that only after any use of stable_idx() in this + // function and after any code that may throw + if (output.log_entries.size()) { + // We advance stable index before the entries are + // actually persisted, because if writing to stable storage + // will fail the FSM will be stopped and get_output() will + // never be called again, so any new sate that assumes that + // the entries are stable will not be observed. + advance_stable_idx(output.log_entries.back()->idx); + } + + return output; +} + +void fsm::advance_stable_idx(index_t idx) { + _log.stable_to(idx); + if (is_leader()) { + auto& progress = _tracker->find(_my_id); + progress.match_idx = idx; + progress.next_idx = index_t{idx + 1}; + replicate(); + check_committed(); + } +} + +void fsm::check_committed() { + + index_t new_commit_idx = _tracker->committed(_commit_idx); + + if (new_commit_idx <= _commit_idx) { + return; + } + + if (_log[new_commit_idx]->term != _current_term) { + + // 3.6.2 Committing entries from previous terms + // Raft never commits log entries from previous terms by + // counting replicas. Only log entries from the leader’s + // current term are committed by counting replicas; once + // an entry from the current term has been committed in + // this way, then all prior entries are committed + // indirectly because of the Log Matching Property. + logger.trace("check_committed[{}]: cannot commit because of term {} != {}", + _my_id, _log[new_commit_idx]->term, _current_term); + return; + } + logger.trace("check_committed[{}]: commit {}", _my_id, new_commit_idx); + _commit_idx = new_commit_idx; + // We have a quorum of servers with match_idx greater than the + // current commit index. Commit && apply more entries. + _sm_events.signal(); +} + +void fsm::tick_leader() { + if (_clock.now() - _last_election_time >= ELECTION_TIMEOUT) { + // 6.2 Routing requests to the leader + // A leader in Raft steps down if an election timeout + // elapses without a successful round of heartbeats to a majority + // of its cluster; this allows clients to retry their requests + // with another server. + return become_follower(server_id{}); + } + + size_t active = 1; // +1 for self + for (auto& [id, progress] : *_tracker) { + if (progress.id != _my_id) { + if (_failure_detector.is_alive(progress.id)) { + active++; + } + if (progress.state == follower_progress::state::PIPELINE && + progress.in_flight == follower_progress::max_in_flight) { + + progress.in_flight--; // allow one more packet to be sent + } + if (progress.match_idx < _log.stable_idx() || progress.commit_idx < _commit_idx) { + logger.trace("tick[{}]: replicate to {} because match={} < stable={} || " + "follower commit_idx={} < commit_idx={}", + _my_id, progress.id, progress.match_idx, _log.stable_idx(), + progress.commit_idx, _commit_idx); + + replicate_to(progress, true); + } + } + } + if (active >= _tracker->size()/2 + 1) { + // Advance last election time if we heard from + // the quorum during this tick. + _last_election_time = _clock.now(); + } +} + +void fsm::tick() { + _clock.advance(); + + if (is_leader()) { + tick_leader(); + } else if (_current_leader && _failure_detector.is_alive(_current_leader)) { + // Ensure the follower doesn't disrupt a valid leader + // simple because there were no AppendEntries RPCs recently. + _last_election_time = _clock.now(); + } else if (is_past_election_timeout()) { + logger.trace("tick[{}]: becoming a candidate, last election: {}, now: {}", _my_id, + _last_election_time, _clock.now()); + become_candidate(); + } +} + +void fsm::append_entries(server_id from, append_request_recv&& request) { + logger.trace("append_entries[{}] received ct={}, prev idx={} prev term={} commit idx={}, idx={}", + _my_id, request.current_term, request.prev_log_idx, request.prev_log_term, + request.leader_commit_idx, request.entries.size() ? request.entries[0].idx : index_t(0)); + + assert(is_follower()); + // 3.4. Leader election + // A server remains in follower state as long as it receives + // valid RPCs from a leader. + _last_election_time = _clock.now(); + + // Ensure log matching property, even if we append no entries. + // 3.5 + // Until the leader has discovered where it and the + // follower’s logs match, the leader can send + // AppendEntries with no entries (like heartbeats) to save + // bandwidth. + auto [match, term] = _log.match_term(request.prev_log_idx, request.prev_log_term); + if (!match) { + logger.trace("append_entries[{}]: no matching term at position {}: expected {}, found {}", + _my_id, request.prev_log_idx, request.prev_log_term, term); + // Reply false if log doesn't contain an entry at + // prevLogIndex whose term matches prevLogTerm (§5.3). + send_to(from, append_reply{_current_term, _commit_idx, append_reply::rejected{request.prev_log_idx, _log.last_idx()}}); + return; + } + + // If there are no entries it means that the leader wants + // to ensure forward progress. Reply with the last index + // that matches. + index_t last_new_idx = request.prev_log_idx; + + if (!request.entries.empty()) { + last_new_idx = _log.maybe_append(std::move(request.entries)); + } + + advance_commit_idx(request.leader_commit_idx); + + send_to(from, append_reply{_current_term, _commit_idx, append_reply::accepted{last_new_idx}}); +} + +void fsm::append_entries_reply(server_id from, append_reply&& reply) { + assert(is_leader()); + + follower_progress& progress = _tracker->find(from); + + if (progress.state == follower_progress::state::PIPELINE) { + if (progress.in_flight) { + // in_flight is not precise, so do not let it underflow + progress.in_flight--; + } + } + + progress.commit_idx = reply.commit_idx; + + if (std::holds_alternative(reply.result)) { + // accepted + index_t last_idx = std::get(reply.result).last_new_idx; + + logger.trace("append_entries_reply[{}->{}]: accepted match={} last index={}", + _my_id, from, progress.match_idx, last_idx); + + progress.match_idx = std::max(progress.match_idx, last_idx); + // out next_idx may be large because of optimistic increase in pipeline mode + progress.next_idx = std::max(progress.next_idx, index_t(last_idx + 1)); + + progress.become_pipeline(); + + // check if any new entry can be committed + check_committed(); + } else { + // rejected + append_reply::rejected rejected = std::get(reply.result); + + logger.trace("append_entries_reply[{}->{}]: rejected match={} index={}", + _my_id, from, progress.match_idx, rejected.non_matching_idx); + + // check reply validity + if (progress.is_stray_reject(rejected)) { + logger.trace("append_entries_reply[{}->{}]: drop stray append reject", _my_id, from); + return; + } + + // Start re-sending from the non matching index, or from + // the last index in the follower's log. + // FIXME: make it more efficient + progress.next_idx = std::min(rejected.non_matching_idx, index_t(rejected.last_idx + 1)); + + progress.become_probe(); + + // We should not fail to apply an entry following the matched one. + assert(progress.next_idx != progress.match_idx); + } + + logger.trace("append_entries_reply[{}->{}]: next_idx={}, match_idx={}", + _my_id, from, progress.next_idx, progress.match_idx); + + replicate_to(progress, false); +} + +void fsm::request_vote(server_id from, vote_request&& request) { + + // We can cast a vote in any state. If the candidate's term is + // lower than ours, we ignore the request. Otherwise we first + // update our current term and convert to a follower. + assert(_current_term == request.current_term); + + bool can_vote = + // We can vote if this is a repeat of a vote we've already cast... + _voted_for == from || + // ...we haven't voted and we don't think there's a leader yet in this term... + (_voted_for == server_id{} && _current_leader == server_id{}); + + // ...and we believe the candidate is up to date. + if (can_vote && _log.is_up_to_date(request.last_log_idx, request.last_log_term)) { + + logger.trace("{} [term: {}, index: {}, log_term: {}, voted_for: {}] " + "voted for {} [log_term: {}, log_index: {}]", + _my_id, _current_term, _log.last_idx(), _log.last_term(), _voted_for, + from, request.last_log_term, request.last_log_idx); + + _voted_for = from; + + send_to(from, vote_reply{_current_term, true}); + } else { + logger.trace("{} [term: {}, index: {}, log_term: {}, voted_for: {}] " + "rejected vote for {} [log_term: {}, log_index: {}]", + _my_id, _current_term, _log.last_idx(), _log.last_term(), _voted_for, + from, request.last_log_term, request.last_log_idx); + + send_to(from, vote_reply{_current_term, false}); + } +} + +void fsm::request_vote_reply(server_id from, vote_reply&& reply) { + assert(is_candidate()); + + logger.trace("{} received a {} vote from {}", _my_id, reply.vote_granted ? "yes" : "no", from); + + _votes->register_vote(from, reply.vote_granted); + + switch (_votes->tally_votes()) { + case vote_result::UNKNOWN: + break; + case vote_result::WON: + become_leader(); + break; + case vote_result::LOST: + become_follower(server_id{}); + break; + } +} + +void fsm::replicate_to(follower_progress& progress, bool allow_empty) { + + logger.trace("replicate_to[{}->{}]: called next={} match={}", + _my_id, progress.id, progress.next_idx, progress.match_idx); + + while (progress.can_send_to(_clock.now())) { + index_t next_idx = progress.next_idx; + if (progress.next_idx > _log.stable_idx()) { + next_idx = index_t(0); + logger.trace("replicate_to[{}->{}]: next past stable next={} stable={}, empty={}", + _my_id, progress.id, progress.next_idx, _log.stable_idx(), allow_empty); + if (!allow_empty) { + // Send out only persisted entries. + return; + } + } + + allow_empty = false; // allow only one empty message + + index_t prev_idx = index_t(0); + term_t prev_term = _current_term; + if (progress.next_idx != 1) { + auto& s = _log.get_snapshot(); + prev_idx = index_t(progress.next_idx - 1); + assert (prev_idx >= s.idx); + prev_term = s.idx == prev_idx ? s.term : _log[prev_idx]->term; + } + + append_request_send req = {{ + .current_term = _current_term, + .leader_id = _my_id, + .prev_log_idx = prev_idx, + .prev_log_term = prev_term, + .leader_commit_idx = _commit_idx + }, + std::vector() + }; + + if (next_idx) { + const log_entry& entry = *_log[next_idx]; + // TODO: send only one entry for now, but we should batch in the future + req.entries.push_back(std::cref(entry)); + logger.trace("replicate_to[{}->{}]: send entry idx={}, term={}", + _my_id, progress.id, entry.idx, entry.term); + + if (progress.state == follower_progress::state::PIPELINE) { + progress.in_flight++; + // Optimistically update next send index. In case + // a message is lost there will be negative reply that + // will re-send idx. + progress.next_idx++; + } + } else { + logger.trace("replicate_to[{}->{}]: send empty", _my_id, progress.id); + } + + send_to(progress.id, std::move(req)); + + progress.last_append_time = _clock.now(); + } +} + +void fsm::replicate() { + assert(is_leader()); + for (auto& [id, progress] : *_tracker) { + if (progress.id != _my_id) { + replicate_to(progress, false); + } + } +} + +bool fsm::can_read() { + check_is_leader(); + + if (_log[_log.last_idx()]->term != _current_term) { + return false; + } + + // TODO: for now always return false to let the caller know that + // applying dummy entry is needed before reading (to confirm the leadership), + // but in the future we may return true here if we can guaranty leadership + // by means of a "stable leader" optimization. "Stable leader" ensures that + // a follower does not vote for other leader if it recently (during a couple + // of last ticks) heard from existing one, so if the leader is already committed + // entries during this tick it guaranties that it communicated with + // majority of nodes and no other leader could have been elected. + + return false; +} + +void fsm::stop() { + _sm_events.broken(); +} + +} // end of namespace raft diff --git a/raft/fsm.hh b/raft/fsm.hh new file mode 100644 index 0000000000..fa1f1f0b66 --- /dev/null +++ b/raft/fsm.hh @@ -0,0 +1,385 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include "raft.hh" +#include "progress.hh" +#include "log.hh" + +namespace raft { + +// State of the FSM that needs logging & sending. +struct fsm_output { + term_t term; + server_id vote; + std::vector log_entries; + std::vector> messages; + // Entries to apply. + std::vector committed; +}; + +// 3.4 Leader election +// If a follower receives no communication over a period of +// time called the election timeout, then it assumes there is +// no viable leader and begins an election to choose a new +// leader. +static constexpr logical_clock::duration ELECTION_TIMEOUT = logical_clock::duration{10}; + +// 3.3 Raft Basics +// At any given time each server is in one of three states: +// leader, follower, or candidate. +// In normal operation there is exactly one leader and all of the +// other servers are followers. Followers are passive: they issue +// no requests on their own but simply respond to requests from +// leaders and candidates. The leader handles all client requests +// (if a client contacts a follower, the follower redirects it to +// the leader). The third state, candidate, is used to elect a new +// leader. +class follower {}; +class candidate {}; +class leader {}; + +// Raft protocol finite state machine +// +// Most libraries separate themselves from implementations by +// providing an API to the environment of the Raft protocol, such +// as the database, the write ahead log and the RPC to peers. + +// This callback based design has some drawbacks: + +// - some callbacks may be defined in blocking model; e.g. +// writing log entries to disk, or persisting the current +// term in the database; Seastar has no blocking IO and +// would have to emulate it with fibers; +// - the API calls are spread over the state machine +// implementation, which makes reasoning about the correctness +// more difficult (what happens if the library is is accessed +// concurrently by multiple users, which of these accesses have +// to be synchronized; what if the callback fails, is the state +// machine handling the error correctly?) +// - while using callbacks allow testing without a real network or disk, +// it still complicates it, since one has to implement meaningful +// mocks for most of the APIs. +// +// Seastar Raft instead implements an instance of Raft as +// in-memory state machine with a catch-all API step(message) +// method. The method handles any kind of input and performs the +// needed state machine state transitions. To get state machine output +// poll_output() function has to be called. This call produces an output +// object, which encapsulates a list of actions that must be +// performed until the next poll_output() call can be made. The time is +// represented with a logical timer. The client is responsible for +// periodically invoking tick() method, which advances the state +// machine time and allows it to track such events as election or +// heartbeat timeouts. +class fsm { + // id of this node + server_id _my_id; + // id of the current leader + server_id _current_leader; + // What state the server is in. The default is follower. + std::variant _state; + // _current_term, _voted_for && _log are persisted in storage + // The latest term the server has seen. + term_t _current_term; + // Candidate id that received a vote in the current term (or + // nil if none). + server_id _voted_for; + // Index of the highest log entry known to be committed. + // Currently not persisted. + index_t _commit_idx = index_t(0); + // Log entries; each entry contains a command for state machine, + // and the term when the entry was received by the leader. + log _log; + // A possibly shared server failure detector. + failure_detector& _failure_detector; + + // Stores the last state observed by get_output(). + // Is updated with the actual state of the FSM after + // fsm_output is created. + struct last_observed_state { + term_t _current_term; + server_id _voted_for; + index_t _commit_idx; + + bool is_equal(const fsm& fsm) const { + return _current_term == fsm._current_term && _voted_for == fsm._voted_for && + _commit_idx == fsm._commit_idx; + } + + void advance(const fsm& fsm) { + _current_term = fsm._current_term; + _voted_for = fsm._voted_for; + _commit_idx = fsm._commit_idx; + } + } _observed; + + logical_clock _clock; + // Start of the current election epoch - a time point relative + // to which we expire election timeout. + logical_clock::time_point _last_election_time = logical_clock::min(); + // A random value in range [election_timeout, 2 * election_timeout), + // reset on each term change. + logical_clock::duration _randomized_election_timeout = ELECTION_TIMEOUT; + // Votes received during an election round. Available only in + // candidate state. + std::optional _votes; + + // A state for each follower, maintained only on the leader. + std::optional _tracker; + // Holds all replies to AppendEntries RPC which are not + // yet sent out. If AppendEntries request is accepted, we must + // withhold a reply until the respective entry is persisted in + // the log. Otherwise, e.g. when we receive AppendEntries with + // an older term, we may reject it immediately. + // Either way all replies are appended to this queue first. + // + // 3.3 Raft Basics + // If a server receives a request with a stale term number, it + // rejects the request. + // TLA+ line 328 + std::vector> _messages; + + // Currently used configuration, may be different from + // the committed during a configuration change. + configuration _current_config; + + // Signaled when there is a IO event to process. + seastar::condition_variable _sm_events; + // Called when one of the replicas advances its match index + // so it may be the case that some entries are committed now. + // Signals _sm_events. + void check_committed(); + // Check if the randomized election timeout has expired. + bool is_past_election_timeout() const { + return _clock.now() - _last_election_time >= _randomized_election_timeout; + } + // How much time has passed since last election or last + // time we heard from a valid leader. + logical_clock::duration election_elapsed() const { + return _clock.now() - _last_election_time; + } + + // A helper to send any kind of RPC message. + template + void send_to(server_id to, Message&& m) { + static_assert(std::is_rvalue_reference::value, "must be rvalue"); + _messages.push_back(std::make_pair(to, std::move(m))); + _sm_events.signal(); + } + + // A helper to update the FSM's current term. + void update_current_term(term_t current_term); + + void check_is_leader() const { + if (!is_leader()) { + throw not_a_leader(_current_leader); + } + } + + void become_candidate(); + + void become_follower(server_id leader); + + // Controls whether the follower has been responsive recently, + // so it makes sense to send more data to it. + bool can_send_to(const follower_progress& progress); + // Replicate entries to a follower. If there are no entries to send + // and allow_empty is true, send a heartbeat. + void replicate_to(follower_progress& progress, bool allow_empty); + void replicate(); + void append_entries(server_id from, append_request_recv&& append_request); + void append_entries_reply(server_id from, append_reply&& reply); + + void request_vote(server_id from, vote_request&& vote_request); + void request_vote_reply(server_id from, vote_reply&& vote_reply); + + // Called on a follower with a new known leader commit index. + // Advances the follower's commit index up to all log-stable + // entries, known to be committed. + void advance_commit_idx(index_t leader_commit_idx); + // Called after log entries in FSM output are considered persisted. + // Produces new FSM output. + void advance_stable_idx(index_t idx); + // Tick implementation on a leader + void tick_leader(); + + // Set cluster configuration + void set_configuration(const configuration& config) { + _current_config = config; + // We unconditionally access _current_config + // to identify which entries are committed. + assert(_current_config.servers.size() > 0); + if (is_leader()) { + _tracker->set_configuration(_current_config.servers, _log.next_idx()); + } else if (is_candidate()) { + _votes->set_configuration(_current_config.servers); + } + } +public: + explicit fsm(server_id id, term_t current_term, server_id voted_for, log log, + failure_detector& failure_detector); + + bool is_leader() const { + return std::holds_alternative(_state); + } + bool is_follower() const { + return std::holds_alternative(_state); + } + bool is_candidate() const { + return std::holds_alternative(_state); + } + + void become_leader(); + + // Add an entry to in-memory log. The entry has to be + // committed to the persistent Raft log afterwards. + template const log_entry& add_entry(T command); + + // Wait until there is, and return state machine output that + // needs to be handled. + // This includes a list of the entries that need + // to be logged. The logged entries are eventually + // discarded from the state machine after snapshotting. + future poll_output(); + + // Get state machine output, if there is any. Doesn't + // wait. It is public for use in testing. + // May throw on allocation failure, but leaves state machine + // in the same state in that case + fsm_output get_output(); + + // Called to advance virtual clock of the protocol state machine. + void tick(); + + // Feed one Raft RPC message into the state machine. + // Advances the state machine state and generates output, + // accessible via poll_output(). + template + void step(server_id from, Message&& msg); + + void stop(); + + // @sa can_read() + term_t get_current_term() const { + return _current_term; + } + + // Should be called on leader only, throws otherwise. + // Returns true if the current leader has at least one entry + // committed and a quorum of followers was alive in the last + // tick period. + bool can_read(); + + friend std::ostream& operator<<(std::ostream& os, const fsm& f); +}; + +template +void fsm::step(server_id from, Message&& msg) { + static_assert(std::is_rvalue_reference::value, "must be rvalue"); + // 4.1. Safety + // Servers process incoming RPC requests without consulting + // their current configurations. + + // 3.3. Raft basics. + // + // Current terms are exchanged whenever servers + // communicate; if one server’s current term is smaller + // than the other’s, then it updates its current term to + // the larger value. If a candidate or leader discovers + // that its term is out of date, it immediately reverts to + // follower state. If a server receives a request with + // a stale term number, it rejects the request. + if (msg.current_term > _current_term) { + logger.trace("{} [term: {}] received a message with higher term from {} [term: {}]", + _my_id, _current_term, from, msg.current_term); + + if constexpr (std::is_same_v) { + become_follower(from); + } else { + if constexpr (std::is_same_v) { + if (_current_leader != server_id{} && election_elapsed() < ELECTION_TIMEOUT) { + // 4.2.3 Disruptive servers + // If a server receives a RequestVote request + // within the minimum election timeout of + // hearing from a current leader, it does not + // update its term or grant its vote. + logger.trace("{} [term: {}] not granting a vote within a minimum election timeout, elapsed {}", + _my_id, _current_term, election_elapsed()); + return; + } + } + become_follower(server_id{}); + } + update_current_term(msg.current_term); + + } else if (msg.current_term < _current_term) { + if constexpr (std::is_same_v) { + // Instructs the leader to step down. + append_reply reply{_current_term, _commit_idx, append_reply::rejected{msg.prev_log_idx, _log.last_idx()}}; + send_to(from, std::move(reply)); + } else { + // Ignore other cases + logger.trace("{} [term: {}] ignored a message with lower term from {} [term: {}]", + _my_id, _current_term, from, msg.current_term); + } + return; + } + + auto visitor = [this, from, msg = std::move(msg)](auto state) mutable { + using State = decltype(state); + + if constexpr (std::is_same_v) { + // Got AppendEntries RPC from self + assert((!std::is_same_v)); + // 3.4 Leader Election + // While waiting for votes, a candidate may receive an AppendEntries + // RPC from another server claiming to be leader. If the + // leader’s term (included in its RPC) is at least as large as the + // candidate’s current term, then the candidate recognizes the + // leader as legitimate and returns to follower state. + if constexpr (std::is_same_v) { + become_follower(from); + } + append_entries(from, std::move(msg)); + } else if constexpr (std::is_same_v) { + if constexpr (!std::is_same_v) { + // Ignore stray reply if we're not a leader. + return; + } + append_entries_reply(from, std::move(msg)); + } else if constexpr (std::is_same_v) { + request_vote(from, std::move(msg)); + } else if constexpr (std::is_same_v) { + if constexpr (!std::is_same_v) { + // Ignore stray reply if we're not a candidate. + return; + } + request_vote_reply(from, std::move(msg)); + } + }; + + std::visit(visitor, _state); +} + +} // namespace raft + diff --git a/raft/internal.hh b/raft/internal.hh new file mode 100644 index 0000000000..8e908be5c4 --- /dev/null +++ b/raft/internal.hh @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include +#include "utils/UUID.hh" + +namespace raft { +namespace internal { + +template +class tagged_uint64 { + uint64_t _val; +public: + tagged_uint64() : _val(0) {} + explicit tagged_uint64(uint64_t v) : _val(v) {} + tagged_uint64(const tagged_uint64&) = default; + tagged_uint64(tagged_uint64&&) = default; + tagged_uint64& operator=(const tagged_uint64&) = default; + auto operator<=>(const tagged_uint64&) const = default; + explicit operator bool() const { return _val != 0; } + + operator uint64_t() const { + return _val; + } + tagged_uint64& operator++() { // pre increment + ++_val; + return *this; + } + tagged_uint64 operator++(int) { // post increment + uint64_t v = _val++; + return tagged_uint64(v); + } + tagged_uint64& operator--() { // pre decrement + --_val; + return *this; + } + tagged_uint64 operator--(int) { // post decrement + uint64_t v = _val--; + return tagged_uint64(v); + } + tagged_uint64 operator+(const tagged_uint64& o) const { + return tagged_uint64(_val + o._val); + } + tagged_uint64 operator-(const tagged_uint64& o) const { + return tagged_uint64(_val - o._val); + } + friend std::ostream& operator<<(std::ostream& os, const tagged_uint64& u) { + os << u._val; + return os; + } +}; + +template +struct tagged_id { + utils::UUID id; + bool operator==(const tagged_id& o) const { + return id == o.id; + } + explicit operator bool() const { + // The default constructor sets the id to nil, which is + // guaranteed to not match any valid id. + return id != utils::UUID(); + } +}; + +template +std::ostream& operator<<(std::ostream& os, const tagged_id& id) { + os << id.id; + return os; +} + +} // end of namespace internal +} // end of namespace raft + +namespace std { + +template +struct hash> { + size_t operator()(const raft::internal::tagged_id& id) const { + return hash()(id.id); + } +}; + +} // end of namespace std + diff --git a/raft/log.cc b/raft/log.cc new file mode 100644 index 0000000000..cdd5767beb --- /dev/null +++ b/raft/log.cc @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "log.hh" + +namespace raft { + +log_entry_ptr& log::get_entry(index_t i) { + return _log[i - start_idx()]; +} + +log_entry_ptr& log::operator[](size_t i) { + assert(index_t(i) >= start_idx()); + return get_entry(index_t(i)); +} + +void log::emplace_back(log_entry&& e) { + _log.emplace_back(seastar::make_lw_shared(std::move(e))); +} + +bool log::empty() const { + return _log.empty(); +} + +bool log::is_up_to_date(index_t idx, term_t term) const { + // 3.6.1 Election restriction + // Raft determines which of two logs is more up-to-date by comparing the + // index and term of the last entries in the logs. If the logs have last + // entries with different terms, then the log with the later term is more + // up-to-date. If the logs end with the same term, then whichever log is + // longer is more up-to-date. + return term > last_term() || (term == last_term() && idx >= last_idx()); +} + +index_t log::last_idx() const { + return index_t(_log.size()) + start_idx() - index_t(1); +} + +index_t log::next_idx() const { + return last_idx() + index_t(1); +} + +void log::truncate_head(index_t idx) { + assert(idx >= start_idx()); + auto it = _log.begin() + (idx - start_idx()); + _log.erase(it, _log.end()); + stable_to(std::min(_stable_idx, last_idx())); +} + +void log::truncate_tail(index_t idx) { + assert(start_idx() <= idx); + + if (idx >= last_idx()) { + _log.clear(); + } else if (idx > start_idx()) { + _log.erase(_log.begin(), _log.begin() + idx - start_idx() + 1); + } + + _stable_idx = std::max(idx, _stable_idx); +} + +index_t log::start_idx() const { + return _snapshot.idx + index_t(1); +} + +term_t log::last_term() const { + if (_log.empty()) { + return term_t(0); + } + return _log.back()->term; +} + +void log::stable_to(index_t idx) { + assert(idx <= last_idx()); + _stable_idx = idx; +} + +std::pair log::match_term(index_t idx, term_t term) const { + if (idx == 0) { + // Special case of empty log on leader, + // TLA+ line 324. + return std::make_pair(true, term_t(0)); + } + + // idx cannot point into the snapshot + assert(idx >= _snapshot.idx); + + term_t my_term; + + if (idx == _snapshot.idx) { + my_term = _snapshot.term; + } else { + auto i = idx - start_idx(); + + if (i >= _log.size()) { + // We have a gap between the follower and the leader. + return std::make_pair(false, term_t(0)); + } + + my_term = _log[i]->term; + } + + return my_term == term ? std::make_pair(true, term_t(0)) : std::make_pair(false, my_term); +} + +index_t log::maybe_append(std::vector&& entries) { + assert(!entries.empty()); + + index_t last_new_idx = entries.back().idx; + + // We must scan through all entries if the log already + // contains them to ensure the terms match. + for (auto& e : entries) { + if (e.idx <= last_idx()) { + if (e.idx < start_idx()) { + logger.trace("append_entries: skipping entry with idx {} less than log start {}", + e.idx, start_idx()); + continue; + } + if (e.term == get_entry(e.idx)->term) { + logger.trace("append_entries: entries with index {} has matching terms {}", e.idx, e.term); + continue; + } + logger.trace("append_entries: entries with index {} has non matching terms e.term={}, _log[i].term = {}", + e.idx, e.term, get_entry(e.idx)->term); + // If an existing entry conflicts with a new one (same + // index but different terms), delete the existing + // entry and all that follow it (§5.3). + truncate_head(e.idx); + } + // Assert log monotonicity + assert(e.idx == next_idx()); + _log.emplace_back(seastar::make_lw_shared(std::move(e))); + } + + return last_new_idx; +} + +void log::apply_snapshot(snapshot&& snp) { + // call truncate first since it uses old snapshot + truncate_tail(snp.idx); + _snapshot = std::move(snp); +} + +std::ostream& operator<<(std::ostream& os, const log& l) { + os << "next idx: " << l.next_idx() << ", "; + os << "last idx: " << l.last_idx() << ", "; + os << "stable idx: " << l.stable_idx() << ", "; + os << "start idx: " << l.start_idx() << ", "; + os << "last term: " << l.last_term(); + return os; +} + +} // end of namespace raft diff --git a/raft/log.hh b/raft/log.hh new file mode 100644 index 0000000000..1aad879ce5 --- /dev/null +++ b/raft/log.hh @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include "raft.hh" + +namespace raft { +// This class represents the Raft log in memory. +// +// The value of the first index is 1. +// New entries are added at the back. +// +// Entries are persisted locally after they are added. Entries may be +// dropped from the beginning by snapshotting and from the end by +// a new leader replacing stale entries. Any exception thrown by +// any function leaves the log in a consistent state. +class log { + // Snapshot of the prefix of the log. + snapshot _snapshot; + // We need something that can be truncated from both sides. + // std::deque move constructor is not nothrow hence cannot be used + log_entries _log; + // Index of the last stable (persisted) entry in the log. + index_t _stable_idx = index_t(0); + +private: + void truncate_head(index_t i); + void truncate_tail(index_t i); + log_entry_ptr& get_entry(index_t); + index_t start_idx() const; +public: + log() = default ; + explicit log(log_entries log) : _log(std::move(log)) { stable_to(index_t(_log.size())); }; + log(snapshot snp, log_entries log) : _snapshot(std::move(snp)), _log(std::move(log)) { stable_to(last_idx()); } + explicit log(snapshot snp) : _snapshot(std::move(snp)) {} + // The index here the global raft log index, not related to a snapshot. + // It is a programming error to call the function with an index that points into the snapshot, + // the function will abort() + log_entry_ptr& operator[](size_t i); + // Add an entry to the log. + void emplace_back(log_entry&& e); + // Mark all entries up to this index as stable. + void stable_to(index_t idx); + // Return true if in memory log is empty. + bool empty() const; + // 3.6.1 Election restriction. + // The voter denies its vote if its own log is more up-to-date + // than that of the candidate. + bool is_up_to_date(index_t idx, term_t term) const; + index_t next_idx() const; + index_t last_idx() const; + index_t stable_idx() const { + return _stable_idx; + } + term_t last_term() const; + + // The function returns current snapshot state of the log + const snapshot& get_snapshot() const { + return _snapshot; + } + + void apply_snapshot(snapshot&& snp); + + // 3.5 + // Raft maintains the following properties, which + // together constitute the Log Matching Property: + // * If two entries in different logs have the same index and + // term, then they store the same command. + // * If two entries in different logs have the same index and + // term, then the logs are identical in all preceding entries. + // + // The first property follows from the fact that a leader + // creates at most one entry with a given log index in a given + // term, and log entries never change their position in the + // log. The second property is guaranteed by a consistency + // check performed by AppendEntries. When sending an + // AppendEntries RPC, the leader includes the index and term + // of the entry in its log that immediately precedes the new + // entries. If the follower does not find an entry in its log + // with the same index and term, then it refuses the new + // entries. + // + // @retval first is true - there is a match, term value is irrelevant + // @retval first is false - the follower's log doesn't match the leader's + // and non matching term is in second + std::pair match_term(index_t idx, term_t term) const; + + // Called on a follower to append entries from a leader. + // @retval return an index of last appended entry + index_t maybe_append(std::vector&& entries); + + friend std::ostream& operator<<(std::ostream& os, const log& l); +}; + +} diff --git a/raft/logical_clock.hh b/raft/logical_clock.hh new file mode 100644 index 0000000000..2a6fc79a46 --- /dev/null +++ b/raft/logical_clock.hh @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include + +namespace raft { + +// Raft protocol state machine clock ticks at different speeds +// depending on the environment. A typical clock tick for +// a production system is 100ms, while a test system can +// tick it at the speed of the hardware clock. +// +// Every state machine has an own instance of logical clock, +// this enables tests when different state machines run at +// different clock speeds. +class logical_clock final { +public: + using rep = int64_t; + // There is no realistic period for a logical clock, + // just use the smallest period possible. + using period = std::chrono::nanoseconds::period; + using duration = std::chrono::duration; + using time_point = std::chrono::time_point; + + static constexpr bool is_steady = true; + + void advance(duration diff = duration{1}) { + _now += diff; + } + time_point now() const noexcept { + return _now; + } + + static constexpr time_point min() { + return time_point(duration{0}); + } +private: + time_point _now = min(); +}; + +inline std::ostream& operator<<(std::ostream& os, const logical_clock::time_point& p) { + return os << (p - logical_clock::min()).count(); +} + +} // end of namespace raft + +namespace std { + +inline std::ostream& operator<<(std::ostream& os, const raft::logical_clock::duration& d) { + return os << d.count(); +} + +} // end of namespace std diff --git a/raft/progress.cc b/raft/progress.cc new file mode 100644 index 0000000000..3ee4fd7801 --- /dev/null +++ b/raft/progress.cc @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "progress.hh" +#include + +namespace raft { + +bool follower_progress::is_stray_reject(const append_reply::rejected& rejected) { + switch (state) { + case follower_progress::state::PIPELINE: + if (rejected.non_matching_idx <= match_idx) { + // If rejected index is smaller that matched it means this is a stray reply + return true; + } + break; + case follower_progress::state::PROBE: + // In the probe state the reply is only valid if it matches next_idx - 1, since only + // one append request is outstanding. + if (rejected.non_matching_idx != index_t(next_idx - 1)) { + return true; + } + break; + case follower_progress::state::SNAPSHOT: + // any reject during snapshot transfer is stray one + return true; + default: + assert(false); + } + return false; +} + +void follower_progress::become_probe() { + state = state::PROBE; +} + +void follower_progress::become_pipeline() { + if (state != state::PIPELINE) { + // If a previous request was accepted, move to "pipeline" state + // since we now know the follower's log state. + state = state::PIPELINE; + in_flight = 0; + } +} + +void follower_progress::become_snapshot() { + state = state::SNAPSHOT; +} + +bool follower_progress::can_send_to(logical_clock::time_point now) { + switch (state) { + case state::PROBE: + return now - last_append_time >= logical_clock::duration{1}; + case state::PIPELINE: + // allow `max_in_flight` outstanding indexes + // FIXME: make it smarter + return in_flight < follower_progress::max_in_flight; + case state::SNAPSHOT: + // In this state we are waiting + // for a snapshot to be transferred + // before starting to sync the log. + return false; + } + assert(false); + return false; +} + +void tracker::set_configuration(const std::vector& servers, index_t next_idx) { + for (auto& s : servers) { + if (this->progress::find(s.id) != this->progress::end()) { + continue; + } + this->progress::emplace(s.id, follower_progress{s.id, next_idx}); + } +} + +index_t tracker::committed(index_t prev_commit_idx) { + std::vector match; + size_t count = 0; + + for (const auto& [id, p] : *this) { + logger.trace("committed {}: {} {}", p.id, p.match_idx, prev_commit_idx); + if (p.match_idx > prev_commit_idx) { + count++; + } + match.push_back(p.match_idx); + } + logger.trace("check committed count {} cluster size {}", count, match.size()); + if (count < match.size()/2 + 1) { + return prev_commit_idx; + } + // The index of the pivot node is selected so that all nodes + // with a larger match index plus the pivot form a majority, + // for example: + // cluster size pivot node majority + // 1 0 1 + // 2 0 2 + // 3 1 2 + // 4 1 3 + // 5 2 3 + // + auto pivot = (match.size() - 1) / 2; + std::nth_element(match.begin(), match.begin() + pivot, match.end()); + return match[pivot]; +} + +std::ostream& operator<<(std::ostream& os, const votes& v) { + os << "responded: " << v._responded << ", "; + os << "granted: " << v._granted; + return os; +} + +} // end of namespace raft diff --git a/raft/progress.hh b/raft/progress.hh new file mode 100644 index 0000000000..33bb09141f --- /dev/null +++ b/raft/progress.hh @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include "raft.hh" +#include "logical_clock.hh" + +namespace raft { + +// Leader's view of each follower, including self. +class follower_progress { +public: + // Id of this server + server_id id; + // Index of the next log entry to send to this server. + index_t next_idx; + // Index of the highest log entry known to be replicated to this + // server. + index_t match_idx = index_t(0); + // Index that we know to be committed by the follower + index_t commit_idx = index_t(0); + + enum class state { + // In this state only one append entry is send until matching index is found + PROBE, + // In this state multiple append entries are sent optimistically + PIPELINE, + // In this state snapshot is been transfered + SNAPSHOT + }; + state state = state::PROBE; + // number of in flight still un-acked append entries requests + size_t in_flight = 0; + static constexpr size_t max_in_flight = 10; + + // Set when a message is sent to the follower. + // Used to decide if a separate keep alive message is needed + // within this tick. + // In probe mode, used to limit the amount of entries sent to + // the follower. + logical_clock::time_point last_append_time = logical_clock::min(); + + // check if a reject packet should be ignored because it was delayed + // or reordered + bool is_stray_reject(const append_reply::rejected&); + + void become_probe(); + void become_pipeline(); + void become_snapshot(); + + // Return true if a new replication record can be sent to the follower. + bool can_send_to(logical_clock::time_point now); + + follower_progress(server_id id_arg, index_t next_idx_arg) + : id(id_arg), next_idx(next_idx_arg) + {} +}; + +using progress = std::unordered_map; + +class tracker: private progress { + // Copy of this server's id + server_id _my_id; +public: + using progress::begin, progress::end, progress::cbegin, progress::cend, progress::size; + + explicit tracker(server_id my_id) + : _my_id(my_id) + {} + + // Return progress for a follower + follower_progress& find(server_id dst) { + return this->progress::find(dst)->second; + } + void set_configuration(const std::vector& servers, index_t next_idx); + + // Calculate the current commit index based on the current + // simple or joint quorum. + index_t committed(index_t prev_commit_idx); +}; + +// Possible leader election outcomes. +enum class vote_result { + // We haven't got enough responses yet, either because + // the servers haven't voted or responses failed to arrive. + UNKNOWN, + // This candidate has won the election + WON, + // The quorum of servers has voted against this candidate + LOST, +}; + +// Candidate's state specific to election +class votes { + size_t _cluster_size = 1; + // Number of responses to RequestVote RPC. + // The candidate always votes for self. + size_t _responded = 1; + // Number of granted votes. + // The candidate always votes for self. + size_t _granted = 1; +public: + void set_configuration(const std::vector& servers) { + _cluster_size = servers.size(); + } + + void register_vote(server_id from, bool granted) { + _responded++; + if (granted) { + _granted++; + } + } + + vote_result tally_votes() const { + auto quorum = _cluster_size / 2 + 1; + if (_granted >= quorum) { + return vote_result::WON; + } + assert(_responded <= _cluster_size); + auto unknown = _cluster_size - _responded; + return _granted + unknown >= quorum ? vote_result::UNKNOWN : vote_result::LOST; + } + + friend std::ostream& operator<<(std::ostream& os, const votes& v); +}; + +} // namespace raft + diff --git a/raft/raft.cc b/raft/raft.cc new file mode 100644 index 0000000000..150d7f217d --- /dev/null +++ b/raft/raft.cc @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "raft.hh" + +namespace raft { + +seastar::logger logger("raft"); + +} // end of namespace raft diff --git a/raft/raft.hh b/raft/raft.hh new file mode 100644 index 0000000000..587d0394b2 --- /dev/null +++ b/raft/raft.hh @@ -0,0 +1,414 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "bytes_ostream.hh" +#include "utils/UUID.hh" +#include "internal.hh" + +namespace raft { +// Keeps user defined command. A user is responsible to serialize +// a state machine operation into it before passing to raft and +// deserialize in apply() before applying. +using command = bytes_ostream; +using command_cref = std::reference_wrapper; + +extern seastar::logger logger; + +// This is user provided id for a snapshot +using snapshot_id = internal::tagged_id; +// Unique identifier of a server in a Raft group +using server_id = internal::tagged_id; + +// This type represents the raft term +using term_t = internal::tagged_uint64; +// This type represensts the index into the raft log +using index_t = internal::tagged_uint64; + +using clock_type = lowres_clock; + +// Opaque connection properties. May contain ip:port pair for instance. +// This value is disseminated between cluster member +// through regular log replication as part of a configuration +// log entry. Upon receiving it a server passes it down to +// RPC module through add_server() call where it is deserialized +// and used to obtain connection info for the node `id`. After a server +// is added to the RPC module RPC's send functions can be used to communicate +// with it using its `id`. +using server_info = bytes; + +struct server_address { + server_id id; + server_info info; +}; + +struct configuration { + std::vector servers; + + configuration(std::initializer_list ids) { + servers.reserve(ids.size()); + for (auto&& id : ids) { + servers.emplace_back(server_address{std::move(id)}); + } + } + configuration() = default; +}; + +struct log_entry { + // Dummy entry is used when a leader needs to commit an entry + // (after leadership change for instance) but there is nothing + // else to commit. + struct dummy {}; + term_t term; + index_t idx; + std::variant data; +}; + +using log_entry_ptr = seastar::lw_shared_ptr; + +struct error : public std::runtime_error { + using std::runtime_error::runtime_error; +}; + +struct not_a_leader : public error { + server_id leader; + explicit not_a_leader(server_id l) : error("Not a leader"), leader(l) {} +}; + +struct dropped_entry : public error { + dropped_entry() : error("Entry was dropped because of a leader change") {} +}; + +struct stopped_error : public error { + stopped_error() : error("Raft instance is stopped") {} +}; + +struct conf_change_in_progress : public error { + conf_change_in_progress() : error("A configuration change is already in progress") {} +}; + +struct snapshot { + // Index and term of last entry in the snapshot + index_t idx = index_t(0); + term_t term = term_t(0); + // The committed configuration in the snapshot + configuration config; + // Id of the snapshot. + snapshot_id id; +}; + +using log_entry_cref = std::reference_wrapper; + +struct append_request_base { + // The leader's term. + term_t current_term; + // So that follower can redirect clients + // In practice we do not need it since we should know sender's id anyway. + server_id leader_id; + // Index of the log entry immediately preceding new ones + index_t prev_log_idx; + // Term of prev_log_idx entry. + term_t prev_log_term; + // The leader's commit_idx. + index_t leader_commit_idx; +}; + +struct append_request_send : public append_request_base { + // Log entries to store (empty vector for heartbeat; may send more + // than one entry for efficiency). + std::vector entries; +}; +struct append_request_recv : public append_request_base { + // Same as for append_request_send but unlike it here the + // message owns the entries. + std::vector entries; +}; + +struct append_reply { + struct rejected { + // Index of non matching entry that caused the request + // to be rejected. + index_t non_matching_idx; + // Last index in the follower's log, can be used to find next + // matching index more efficiently. + index_t last_idx; + }; + struct accepted { + // Last entry that was appended (may be smaller than max log index + // in case follower's log is longer and appended entries match). + index_t last_new_idx; + }; + // Current term, for leader to update itself. + term_t current_term; + // Contains an index of the last commited entry on the follower + // It is used by a leader to know if a follower is behind and issuing + // empty append entry with updates commit_idx if it is + // Regular RAFT handles this by always sending enoty append requests + // as a hearbeat. + index_t commit_idx; + std::variant result; +}; + +struct vote_request { + // The candidate’s term. + term_t current_term; + // The index of the candidate's last log entry. + index_t last_log_idx; + // The term of the candidate's last log entry. + term_t last_log_term; +}; + +struct vote_reply { + // Current term, for the candidate to update itself. + term_t current_term; + // True means the candidate received a vote. + bool vote_granted; +}; + +struct install_snapshot { + // Current term on a leader + term_t current_term; + // A snapshot to install + snapshot snp; +}; + +struct snapshot_reply { + bool success; +}; + +using rpc_message = std::variant; + +// we need something that can be truncated form both sides. +// std::deque move constructor is not nothrow hence cannot be used +using log_entries = boost::container::deque; + +// rpc, storage and satte_machine classes will have to be implemented by the +// raft user to provide network, persistency and busyness logic support +// repectively. +class rpc; +class storage; + +// Any of the functions may return an error, but it will kill the +// raft instance that uses it. Depending on what state the failure +// leaves the state is the raft instance will either have to be recreated +// with the same state machine and rejoined the cluster with the same server_id +// or it new raft instance will have to be created with empty state machine and +// it will have to rejoin to the cluster with different server_id through +// configuration change. +class state_machine { +public: + virtual ~state_machine() {} + + // This is called after entries are committed (replicated to + // at least quorum of servers). If a provided vector contains + // more than one entry all of them will be committed simultaneously. + // Will be eventually called on all replicas, for all commited commands. + // Raft owns the data since it may be still replicating. + // Raft will not call another apply until the retuned future + // will not become ready. + virtual future<> apply(std::vector command) = 0; + + // The function suppose to take a snapshot of a state machine + // To be called during log compaction or when a leader brings + // a lagging follower up-to-date + virtual future take_snaphot() = 0; + + // The function drops a snapshot with a provided id + virtual void drop_snapshot(snapshot_id id) = 0; + + // reload state machine from a snapshot id + // To be used by a restarting server or by a follower that + // catches up to a leader + virtual future<> load_snapshot(snapshot_id id) = 0; + + // stops the state machine instance by aborting the work + // that can be aborted and waiting for all the rest to complete + // any unfinished apply/snapshot operation may return an error after + // this function is called + virtual future<> abort() = 0; +}; + +class rpc_server; + +// It is safe for for rpc implementation to drop any message. +// Error returned by send function will be ignored. All send_() +// functions can be called concurrently, returned future should be +// waited only for back pressure purposes (unless specified otherwise in +// the function's comment). Values passed by reference may be freed as soon +// as function returns. +class rpc { +protected: + // Pointer to Raft server. Needed for passing RPC messages. + rpc_server* _client = nullptr; +public: + virtual ~rpc() {} + + // Send a snapshot snap to a server server_id. + // A returned future is resolved when snapshot is sent and + // successfully applied by a receiver. Will be waited to + // know if a snaphot transfer succeeded. + virtual future<> send_snapshot(server_id server_id, const install_snapshot& snap) = 0; + + // Send provided append_request to the supplied server, does + // not wait for reply. The returned future resolves when + // message is sent. It does not mean it was received. + virtual future<> send_append_entries(server_id id, const append_request_send& append_request) = 0; + + // Send a reply to an append_request. The returned future + // resolves when message is sent. It does not mean it was + // received. + virtual future<> send_append_entries_reply(server_id id, const append_reply& reply) = 0; + + // Send a vote request. The returned future + // resolves when message is sent. It does not mean it was + // received. + virtual future<> send_vote_request(server_id id, const vote_request& vote_request) = 0; + + // Sends a reply to a vote request. The returned future + // resolves when message is sent. It does not mean it was + // received. + virtual future<> send_vote_reply(server_id id, const vote_reply& vote_reply) = 0; + + // When a new server is learn this function is called with the + // info about the server. + virtual void add_server(server_id id, server_info info) = 0; + + // When a server is removed from local config this call is + // executed. + virtual void remove_server(server_id id) = 0; + + // Stop the RPC instance by aborting the work that can be + // aborted and waiting for all the rest to complete any + // unfinished send operation may return an error after this + // function is called. + virtual future<> abort() = 0; +private: + friend rpc_server; +}; + +// Each Raft server is a receiver of RPC messages. +// Defines the API specific to receiving RPC input. +class rpc_server { +public: + virtual ~rpc_server() {}; + + // This function is called by append_entries RPC + virtual void append_entries(server_id from, append_request_recv append_request) = 0; + + // This function is called by append_entries_reply RPC + virtual void append_entries_reply(server_id from, append_reply reply) = 0; + + // This function is called to handle RequestVote RPC. + virtual void request_vote(server_id from, vote_request vote_request) = 0; + // Handle response to RequestVote RPC + virtual void request_vote_reply(server_id from, vote_reply vote_reply) = 0; + + // Apply incoming snapshot, future resolves when application is complete + virtual future<> apply_snapshot(server_id from, install_snapshot snp) = 0; + + // Update RPC implementation with this client as + // the receiver of RPC input. + void set_rpc_server(class rpc *rpc) { rpc->_client = this; } +}; + +// This class represents persistent storage state. If any of the +// function returns an error the Raft instance will be aborted. +class storage { +public: + virtual ~storage() {} + + // Persist given term and vote. + // Can be called concurrently with other save-* functions in + // the storage and with itself but an implementation has to + // make sure that the result is returned back in the calling order. + virtual future<> store_term_and_vote(term_t term, server_id vote) = 0; + + // Load persisted term and vote. + // Called during Raft server initialization only, is not run + // in parallel with store. + virtual future> load_term_and_vote() = 0; + + // Persist given snapshot and drop all but 'preserve_log_entries' + // entries from the Raft log starting from the beginning. + // This can overwrite a previously persisted snapshot. + // Is called only after the previous invocation completes. + // In other words, it's the caller's responsibility to serialize + // calls to this function. Can be called in parallel with + // store_log_entries() but snap.index should belong to an already + // persisted entry. + virtual future<> store_snapshot(const snapshot& snap, size_t preserve_log_entries) = 0; + + // Load a saved snapshot. + // This only loads it into memory, but does not apply yet. To + // apply call 'state_machine::load_snapshot(snapshot::id)' + // Called during Raft server initialization only, should not + // run in parallel with store. + virtual future load_snapshot() = 0; + + // Persist given log entries. + // Can be called without waiting for previous call to resolve, + // but internally all writes should be serialized into forming + // one contiguous log that holds entries in order of the + // function invocation. + virtual future<> store_log_entries(const std::vector& entries) = 0; + + // Load saved Raft log. Called during Raft server + // initialization only, should not run in parallel with store. + virtual future load_log() = 0; + + // Truncate all entries with an index greater or equal than + // the given index in the log and persist the truncation. Can be + // called in parallel with store_log_entries() but internally + // should be linearized vs store_log_entries(): + // store_log_entries() called after truncate_log() should wait + // for truncation to complete internally before persisting its + // entries. + virtual future<> truncate_log(index_t idx) = 0; + + // Stop the storage instance by aborting the work that can be + // aborted and waiting for all the rest to complete any + // unfinished store/load operation may return an error after + // this function is called. + virtual future<> abort() = 0; +}; + +// To support many Raft groups per server, Seastar Raft +// extends original Raft with a shared failure detector. +// It is used instead of empty AppendEntries PRCs in idle +// cluster. +// This allows multiple Raft groups to share heartbeat traffic. +class failure_detector { +public: + virtual ~failure_detector() {} + // Called by each server on each tick, which defaults to 10 + // per second. Should return true if the server is + // alive. False results may impact liveness. + virtual bool is_alive(server_id server) = 0; +}; + +} // namespace raft + diff --git a/raft/server.cc b/raft/server.cc new file mode 100644 index 0000000000..34858a65b4 --- /dev/null +++ b/raft/server.cc @@ -0,0 +1,381 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "server.hh" + +#include +#include +#include +#include +#include +#include + +#include "fsm.hh" +#include "log.hh" + +using namespace std::chrono_literals; + +namespace raft { + +class server_impl : public rpc_server, public server { +public: + explicit server_impl(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector); + + server_impl(server_impl&&) = delete; + + ~server_impl() {} + + // rpc_server interface + void append_entries(server_id from, append_request_recv append_request) override; + void append_entries_reply(server_id from, append_reply reply) override; + void request_vote(server_id from, vote_request vote_request) override; + void request_vote_reply(server_id from, vote_reply vote_reply) override; + + // server interface + future<> add_entry(command command, wait_type type); + future<> apply_snapshot(server_id from, install_snapshot snp) override; + future<> add_server(server_id id, bytes node_info, clock_type::duration timeout) override; + future<> remove_server(server_id id, clock_type::duration timeout) override; + future<> start() override; + future<> abort() override; + term_t get_current_term() const override; + future<> read_barrier() override; + void make_me_leader() override; +private: + std::unique_ptr _rpc; + std::unique_ptr _state_machine; + std::unique_ptr _storage; + seastar::shared_ptr _failure_detector; + // Protocol deterministic finite-state machine + std::unique_ptr _fsm; + // id of this server + server_id _id; + seastar::timer _ticker; + + seastar::pipe> _apply_entries = seastar::pipe>(10); + + struct op_status { + term_t term; // term the entry was added with + promise<> done; // notify when done here + }; + + // Entries that have a waiter that needs to be notified when the + // respective entry is known to be committed. + std::map _awaited_commits; + + // Entries that have a waiter that needs to be notified after + // the respective entry is applied. + std::map _awaited_applies; + + // Called to commit entries (on a leader or otherwise). + void notify_waiters(std::map& waiters, const std::vector& entries); + + // This fiber process fsm output, by doing the following steps in that order: + // - persists current term and voter + // - persists unstable log entries on disk. + // - sends out messages + future<> io_fiber(index_t stable_idx); + + // This fiber runs in the background and applies committed entries. + future<> applier_fiber(); + + template future<> add_entry_internal(T command, wait_type type); + template future<> send_message(server_id id, Message m); + + // Apply a dummy entry. Dummy entry is not propagated to the + // state machine, but waiting for it to be "applied" ensures + // all previous entries are applied as well. + // Resolves when the entry is committed. + // The function has to be called on the leader, throws otherwise + // May fail because of an internal error or because the leader + // has changed and the entry was replaced by another one, + // submitted to the new leader. + future<> apply_dummy_entry(); + + future<> _applier_status = make_ready_future<>(); + future<> _io_status = make_ready_future<>(); + + friend std::ostream& operator<<(std::ostream& os, const server_impl& s); +}; + +server_impl::server_impl(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector) : + _rpc(std::move(rpc)), _state_machine(std::move(state_machine)), + _storage(std::move(storage)), _failure_detector(failure_detector), + _id(uuid) { + set_rpc_server(_rpc.get()); +} + +future<> server_impl::start() { + auto [term, vote] = co_await _storage->load_term_and_vote(); + auto snapshot = co_await _storage->load_snapshot(); + auto log_entries = co_await _storage->load_log(); + auto log = raft::log(std::move(snapshot), std::move(log_entries)); + index_t stable_idx = log.stable_idx(); + _fsm = std::make_unique(_id, term, vote, std::move(log), *_failure_detector); + assert(_fsm->get_current_term() != term_t(0)); + + // start fiber to persist entries added to in-memory log + _io_status = io_fiber(stable_idx); + // start fiber to apply committed entries + _applier_status = applier_fiber(); + + _ticker.arm_periodic(100ms); + _ticker.set_callback([this] { + _fsm->tick(); + }); + + co_return; +} + +template +future<> server_impl::add_entry_internal(T command, wait_type type) { + logger.trace("An entry is submitted on a leader"); + + // lock access to the raft log while it is been updated + + // @todo: ensure the reference to the entry is stable between + // yields, before removing _log_lock. + const log_entry& e = _fsm->add_entry(std::move(command)); + + auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies; + + // This will track the commit/apply status of the entry + auto [it, inserted] = container.emplace(e.idx, op_status{e.term, promise<>()}); + assert(inserted); + return it->second.done.get_future(); +} + +future<> server_impl::add_entry(command command, wait_type type) { + return add_entry_internal(std::move(command), type); +} + +future<> server_impl::apply_dummy_entry() { + return add_entry_internal(log_entry::dummy(), wait_type::applied); +} +void server_impl::append_entries(server_id from, append_request_recv append_request) { + _fsm->step(from, std::move(append_request)); +} + +void server_impl::append_entries_reply(server_id from, append_reply reply) { + _fsm->step(from, std::move(reply)); +} + +void server_impl::request_vote(server_id from, vote_request vote_request) { + _fsm->step(from, std::move(vote_request)); +} + +void server_impl::request_vote_reply(server_id from, vote_reply vote_reply) { + _fsm->step(from, std::move(vote_reply)); +} + +void server_impl::notify_waiters(std::map& waiters, + const std::vector& entries) { + index_t commit_idx = entries.back()->idx; + index_t first_idx = entries.front()->idx; + + while (waiters.size() != 0) { + auto it = waiters.begin(); + if (it->first > commit_idx) { + break; + } + auto [entry_idx, status] = std::move(*it); + + // if there is a waiter entry with an index smaller than first entry + // it means that notification is out of order which is prohinited + assert(entry_idx >= first_idx); + + waiters.erase(it); + if (status.term == entries[entry_idx - first_idx]->term) { + status.done.set_value(); + } else { + // term does not match which means that between the entry was submitted + // and committed there was a leadership change and the entry was replaced. + status.done.set_exception(dropped_entry()); + } + } +} + +template +future<> server_impl::send_message(server_id id, Message m) { + return std::visit([this, id] (auto&& m) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return _rpc->send_append_entries_reply(id, m); + } else if constexpr (std::is_same_v) { + return _rpc->send_append_entries(id, m); + } else if constexpr (std::is_same_v) { + return _rpc->send_vote_request(id, m); + } else if constexpr (std::is_same_v) { + return _rpc->send_vote_reply(id, m); + } else { + static_assert(!sizeof(Message*), "not all message types are handled"); + return make_ready_future<>(); + } + }, std::move(m)); +} + +future<> server_impl::io_fiber(index_t last_stable) { + logger.trace("[{}] io_fiber start", _id); + try { + while (true) { + auto batch = co_await _fsm->poll_output(); + + if (batch.term != term_t{}) { + // Current term and vote are always persisted + // together. A vote may change independently of + // term, but it's safe to update both in this + // case. + co_await _storage->store_term_and_vote(batch.term, batch.vote); + } + + if (batch.log_entries.size()) { + auto& entries = batch.log_entries; + + if (last_stable >= entries[0]->idx) { + co_await _storage->truncate_log(entries[0]->idx); + } + + // Combine saving and truncating into one call? + // will require storage to keep track of last idx + co_await _storage->store_log_entries(entries); + + last_stable = (*entries.crbegin())->idx; + } + + if (batch.messages.size()) { + // after entries are persisted we can send messages + co_await seastar::parallel_for_each(std::move(batch.messages), [this] (std::pair& message) { + return send_message(message.first, std::move(message.second)); + }); + } + + // process committed entries + if (batch.committed.size()) { + notify_waiters(_awaited_commits, batch.committed); + co_await _apply_entries.writer.write(std::move(batch.committed)); + } + } + } catch (seastar::broken_condition_variable&) { + // log fiber is stopped explicitly. + } catch (...) { + logger.error("[{}] io fiber stopped because of the error: {}", _id, std::current_exception()); + } + co_return; +} + +future<> server_impl::apply_snapshot(server_id from, install_snapshot snp) { + return make_ready_future<>(); +} + +future<> server_impl::applier_fiber() { + logger.trace("applier_fiber start"); + try { + while (true) { + auto opt_batch = co_await _apply_entries.reader.read(); + if (!opt_batch) { + // EOF + break; + } + std::vector commands; + commands.reserve(opt_batch->size()); + + std::ranges::copy( + *opt_batch | + std::views::filter([] (log_entry_ptr& entry) { return std::holds_alternative(entry->data); }) | + std::views::transform([] (log_entry_ptr& entry) { return std::cref(std::get(entry->data)); }), + std::back_inserter(commands)); + + co_await _state_machine->apply(std::move(commands)); + notify_waiters(_awaited_applies, *opt_batch); + } + } catch (...) { + logger.error("applier fiber {} stopped because of the error: {}", _id, std::current_exception()); + } + co_return; +} + +term_t server_impl::get_current_term() const { + return _fsm->get_current_term(); +} + +future<> server_impl::read_barrier() { + if (_fsm->can_read()) { + co_return; + } + + co_await apply_dummy_entry(); + co_return; +} + +future<> server_impl::abort() { + logger.trace("abort() called"); + _fsm->stop(); + { + // there is not explicit close for the pipe! + auto tmp = std::move(_apply_entries.writer); + } + for (auto& ac: _awaited_commits) { + ac.second.done.set_exception(stopped_error()); + } + for (auto& aa: _awaited_applies) { + aa.second.done.set_exception(stopped_error()); + } + _awaited_commits.clear(); + _awaited_applies.clear(); + _ticker.cancel(); + + return seastar::when_all_succeed(std::move(_io_status), std::move(_applier_status), + _rpc->abort(), _state_machine->abort(), _storage->abort()).discard_result(); +} + +future<> server_impl::add_server(server_id id, bytes node_info, clock_type::duration timeout) { + return make_ready_future<>(); +} + +// Removes a server from the cluster. If the server is not a member +// of the cluster does nothing. Can be called on a leader only +// otherwise throws not_a_leader. +// Cannot be called until previous add/remove server completes +// otherwise conf_change_in_progress exception is returned. +future<> server_impl::remove_server(server_id id, clock_type::duration timeout) { + return make_ready_future<>(); +} + +void server_impl::make_me_leader() { + _fsm->become_leader(); +} + +std::unique_ptr create_server(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector) { + return std::make_unique(uuid, std::move(rpc), std::move(state_machine), + std::move(storage), failure_detector); +} + +std::ostream& operator<<(std::ostream& os, const server_impl& s) { + os << "[id: " << s._id << ", fsm (" << s._fsm << ")]\n"; + return os; +} + +} // end of namespace raft diff --git a/raft/server.hh b/raft/server.hh new file mode 100644 index 0000000000..f72f3834d3 --- /dev/null +++ b/raft/server.hh @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once +#include "raft.hh" + +namespace raft { + +enum class wait_type { + committed, + applied +}; + +// A single uniquely identified participant of a Raft group. +class server { +public: + virtual ~server() {} + // Add command to replicated log + // Returned future is resolved depending on wait_type parameter: + // 'committed' - when the entry is committed + // 'applied' - when the entry is applied (happens after it is committed) + // The function has to be called on a leader, throws not_a_leader exception otherwise. + // May fail because of internal error or because leader changed and an entry was replaced + // by another leader. In the later case dropped_entry exception will be returned. + virtual future<> add_entry(command command, wait_type type) = 0; + + // Add new server to a cluster. If a node is already a member + // of the cluster does nothing. Provided node_info is passed to + // rpc::new_node() on each node in a cluster as it learns + // about joining node. Connection info can be passed there. + // Can be called on a leader only, otherwise throws not_a_leader. + // Cannot be called until previous add/remove server completes + // otherwise conf_change_in_progress exception is thrown. + virtual future<> add_server(server_id id, bytes node_info, clock_type::duration timeout) = 0; + + // Remove a server from the cluster. If the server is not a member + // of the cluster does nothing. Can be called on a leader only + // otherwise throws not_a_leader. + // Cannot be called until previous add/remove server completes + // otherwise conf_change_in_progress exception is thrown. + virtual future<> remove_server(server_id id, clock_type::duration timeout) = 0; + + // Load persisted state and start background work that needs + // to run for this Raft server to function; The object cannot + // be used until the returned future is resolved. + virtual future<> start() = 0; + + // Stop this Raft server, all submitted but not completed + // operations will get an error and callers will not be able + // to know if they succeeded or not. If this server was + // a leader it will relinquish its leadership and cease + // replication. + virtual future<> abort() = 0; + + // Return Raft protocol current term. + virtual term_t get_current_term() const = 0; + + // May be called before attempting a read from the local state + // machine. The read should proceed only after the returned + // future has resolved successfully. + // If called not on a leader throws not_a_leader error. + // After calling this function and resolving the returned + // future: + // + // 1) The result of all completed + // add_entries(wait_type::applied) can be observed by + // direct access to the local state machine. + // 2) A subsequent add_entry() is likely to find this + // server still in the leader role. + // 3) If the caller ensures that writes to the state machine + // are linearised and the current term didn't change + // between read_barrier() and add_entry(), (@sa + // get_current_term()), a pair of read from the state + // machine and add_entry() will be linearised as well. + // + // To sum up, @read_barrier() can be used as a poor man + // distributed Compare-And-Swap: + // + // lock() + // term_t term = get_current_term() + // co_await read_barrier() + // ... Read previous value from the state machine ... + // ... Create a new value ... + // if (term == get_current_term())) { + // co_await add_entry(); + // } + // unlock() + virtual future<> read_barrier() = 0; + + // Ad hoc functions for testing + + virtual void make_me_leader() = 0; +}; + +std::unique_ptr create_server(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector); + +} // namespace raft + diff --git a/test/boost/raft_fsm_test.cc b/test/boost/raft_fsm_test.cc new file mode 100644 index 0000000000..799ec7eae6 --- /dev/null +++ b/test/boost/raft_fsm_test.cc @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2020, Arm Limited and affiliates. All rights reserved. + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#define BOOST_TEST_MODULE raft + +#include +#include "test/lib/log.hh" + +#include "raft/fsm.hh" + +using raft::term_t, raft::index_t, raft::server_id; + +void election_threshold(raft::fsm& fsm) { + for (int i = 0; i <= raft::ELECTION_TIMEOUT.count(); i++) { + fsm.tick(); + } +} + +void election_timeout(raft::fsm& fsm) { + for (int i = 0; i <= 2 * raft::ELECTION_TIMEOUT.count(); i++) { + fsm.tick(); + } +} + +struct failure_detector: public raft::failure_detector { + bool alive = true; + bool is_alive(raft::server_id from) override { + return alive; + } +}; + +BOOST_AUTO_TEST_CASE(test_election_single_node) { + + failure_detector fd; + server_id id1{utils::make_random_uuid()}; + raft::configuration cfg({id1}); + raft::log log{raft::snapshot{.config = cfg}}; + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + BOOST_CHECK(fsm.is_follower()); + + election_timeout(fsm); + + // Immediately converts from leader to follower if quorum=1 + BOOST_CHECK(fsm.is_leader()); + + auto output = fsm.get_output(); + + BOOST_CHECK(output.term); + BOOST_CHECK(output.vote); + BOOST_CHECK(output.messages.empty()); + BOOST_CHECK(output.log_entries.empty()); + BOOST_CHECK(output.committed.empty()); + // The leader does not become candidate simply because + // a timeout has elapsed, i.e. there are no spurious + // elections. + election_timeout(fsm); + BOOST_CHECK(fsm.is_leader()); + output = fsm.get_output(); + BOOST_CHECK(!output.term); + BOOST_CHECK(!output.vote); + BOOST_CHECK(output.messages.empty()); + BOOST_CHECK(output.log_entries.empty()); + BOOST_CHECK(output.committed.empty()); +} + +// Test that adding an entry to a single-node cluster +// does not lead to RPC +BOOST_AUTO_TEST_CASE(test_single_node_is_quiet) { + + failure_detector fd; + server_id id1{utils::make_random_uuid()}; + raft::configuration cfg({id1}); + raft::log log{raft::snapshot{.config = cfg}}; + + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + election_timeout(fsm); + + // Immediately converts from leader to follower if quorum=1 + BOOST_CHECK(fsm.is_leader()); + + (void) fsm.get_output(); + + fsm.add_entry(raft::command{}); + + BOOST_CHECK(fsm.get_output().messages.empty()); +} + +BOOST_AUTO_TEST_CASE(test_election_two_nodes) { + + failure_detector fd; + + server_id id1{utils::make_random_uuid()}, id2{utils::make_random_uuid()}; + + raft::configuration cfg({id1, id2}); + raft::log log{raft::snapshot{.config = cfg}}; + + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + // Initial state is follower + BOOST_CHECK(fsm.is_follower()); + + // After election timeout, a follower becomes a candidate + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + + // If nothing happens, the candidate stays this way + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + + auto output = fsm.get_output(); + // After a favourable reply, we become a leader (quorum is 2) + fsm.step(id2, raft::vote_reply{output.term, true}); + BOOST_CHECK(fsm.is_leader()); + // Out of order response to the previous election is ignored + fsm.step(id2, raft::vote_reply{output.term - term_t{1}, false}); + assert(fsm.is_leader()); + + // Vote request within the election timeout is ignored + // (avoiding disruptive leaders). + fsm.step(id2, raft::vote_request{output.term + term_t{1}}); + BOOST_CHECK(fsm.is_leader()); + // Any message with a newer term after election timeout + // -> immediately convert to follower + fd.alive = false; + election_threshold(fsm); + fsm.step(id2, raft::vote_request{output.term + term_t{1}}); + BOOST_CHECK(fsm.is_follower()); + + // Check that the candidate converts to a follower as well + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + output = fsm.get_output(); + fsm.step(id2, raft::vote_request{output.term + term_t{1}}); + BOOST_CHECK(fsm.is_follower()); + + // Test that a node doesn't cast a vote if it has voted for + // self already + (void) fsm.get_output(); + while (fsm.is_follower()) { + fsm.tick(); + } + BOOST_CHECK(fsm.is_candidate()); + output = fsm.get_output(); + auto msg = std::get(output.messages.back().second); + fsm.step(id2, std::move(msg)); + // We could figure out this round is going to a nowhere, but + // we're not that smart and simply wait for a vote_reply. + BOOST_CHECK(fsm.is_candidate()); + output = fsm.get_output(); + auto reply = std::get(output.messages.back().second); + BOOST_CHECK(!reply.vote_granted); +} + +BOOST_AUTO_TEST_CASE(test_election_four_nodes) { + + failure_detector fd; + + server_id id1{utils::make_random_uuid()}, + id2{utils::make_random_uuid()}, + id3{utils::make_random_uuid()}, + id4{utils::make_random_uuid()}; + + raft::configuration cfg({id1, id2, id3, id4}); + raft::log log{raft::snapshot{.config = cfg}}; + + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + // Initial state is follower + BOOST_CHECK(fsm.is_follower()); + + // Inform FSM about a new leader at a new term + fsm.step(id4, raft::append_request_recv{term_t{1}, id4, index_t{1}, term_t{1}}); + + (void) fsm.get_output(); + + // Request a vote during the same term. Even though + // we haven't voted, we should deny a vote because we + // know about a leader for this term. + fsm.step(id3, raft::vote_request{term_t{1}, index_t{1}, term_t{1}}); + + auto output = fsm.get_output(); + auto reply = std::get(output.messages.back().second); + BOOST_CHECK(!reply.vote_granted); + + // Run out of steam for this term. Start a new one. + fd.alive = false; + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + + output = fsm.get_output(); + // Add a favourable reply, not enough for quorum + fsm.step(id2, raft::vote_reply{output.term, true}); + BOOST_CHECK(fsm.is_candidate()); + + // Add another one, this adds up to quorum + fsm.step(id3, raft::vote_reply{output.term, true}); + BOOST_CHECK(fsm.is_leader()); +} + +BOOST_AUTO_TEST_CASE(test_log_matching_rule) { + + failure_detector fd; + + server_id id1{utils::make_random_uuid()}, + id2{utils::make_random_uuid()}, + id3{utils::make_random_uuid()}; + + raft::configuration cfg({id1, id2, id3}); + raft::log log(raft::snapshot{.idx = index_t{999}, .config = cfg}); + + log.emplace_back(raft::log_entry{term_t{10}, index_t{1000}}); + log.stable_to(log.last_idx()); + + raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), fd); + + // Initial state is follower + BOOST_CHECK(fsm.is_follower()); + + (void) fsm.get_output(); + + fsm.step(id2, raft::vote_request{term_t{9}, index_t{1001}, term_t{11}}); + // Current term is too old - vote is not granted + auto output = fsm.get_output(); + BOOST_CHECK(output.messages.empty()); + + auto request_vote = [&](term_t term, index_t last_log_idx, term_t last_log_term) -> raft::vote_reply { + fsm.step(id2, raft::vote_request{term, last_log_idx, last_log_term}); + auto output = fsm.get_output(); + return std::get(output.messages.back().second); + }; + + // Last stable index is too small - vote is not granted + BOOST_CHECK(!request_vote(term_t{11}, index_t{999}, term_t{10}).vote_granted); + // Last stable term is too small - vote is not granted + BOOST_CHECK(!request_vote(term_t{12}, index_t{1002}, term_t{9}).vote_granted); + // Last stable term and index are equal to the voter's - vote + // is granted + BOOST_CHECK(request_vote(term_t{13}, index_t{1000}, term_t{10}).vote_granted); + // Last stable term is the same, index is greater to the voter's - vote + // is granted + BOOST_CHECK(request_vote(term_t{14}, index_t{1001}, term_t{10}).vote_granted); + // Both term and index are greater than the voter's - vote + // is granted + BOOST_CHECK(request_vote(term_t{15}, index_t{1001}, term_t{11}).vote_granted); +} diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc new file mode 100644 index 0000000000..1b31539d8d --- /dev/null +++ b/test/raft/replication_test.cc @@ -0,0 +1,378 @@ +#include +#include +#include +#include +#include +#include +#include +#include "raft/server.hh" +#include "serializer.hh" +#include "serializer_impl.hh" + +using namespace std::chrono_literals; + +static seastar::logger tlogger("test"); + +std::mt19937 random_generator() { + std::random_device rd; + // In case of errors, replace the seed with a fixed value to get a deterministic run. + auto seed = rd(); + std::cout << "Random seed: " << seed << "\n"; + return std::mt19937(seed); +} + +int rand() { + static thread_local std::uniform_int_distribution dist(0, std::numeric_limits::max()); + static thread_local auto gen = random_generator(); + + return dist(gen); +} + +bool drop_replication = false; + +class state_machine : public raft::state_machine { +public: + using apply_fn = std::function(raft::server_id id, promise<>&, const std::vector& commands)>; +private: + raft::server_id _id; + apply_fn _apply; + promise<> _done; +public: + state_machine(raft::server_id id, apply_fn apply) : _id(id), _apply(std::move(apply)) {} + virtual future<> apply(const std::vector commands) { + return _apply(_id, _done, commands); + } + virtual future take_snaphot() { return make_ready_future(raft::snapshot_id()); } + virtual void drop_snapshot(raft::snapshot_id id) {} + virtual future<> load_snapshot(raft::snapshot_id id) { return make_ready_future<>(); }; + virtual future<> abort() { return make_ready_future<>(); } + + future<> done() { + return _done.get_future(); + } +}; + +struct initial_state { + raft::term_t term = raft::term_t(1); + raft::server_id vote; + std::vector log; + raft::snapshot snapshot; +}; + + +class storage : public raft::storage { + initial_state _conf; +public: + storage(initial_state conf) : _conf(std::move(conf)) {} + storage() {} + virtual future<> store_term_and_vote(raft::term_t term, raft::server_id vote) { co_return seastar::sleep(1us); } + virtual future> load_term_and_vote() { + auto term_and_vote = std::make_pair(_conf.term, _conf.vote); + return make_ready_future>(term_and_vote); + } + virtual future<> store_snapshot(const raft::snapshot& snap, size_t preserve_log_entries) { return make_ready_future<>(); } + virtual future load_snapshot() { + return make_ready_future(_conf.snapshot); + } + virtual future<> store_log_entries(const std::vector& entries) { co_return seastar::sleep(1us); }; + virtual future load_log() { + raft::log_entries log; + for (auto&& e : _conf.log) { + log.emplace_back(make_lw_shared(std::move(e))); + } + return make_ready_future(std::move(log)); + } + virtual future<> truncate_log(raft::index_t idx) { return make_ready_future<>(); } + virtual future<> abort() { return make_ready_future<>(); } +}; + +class rpc : public raft::rpc { + static std::unordered_map net; + raft::server_id _id; +public: + rpc(raft::server_id id) : _id(id) { + net[_id] = this; + } + virtual future<> send_snapshot(raft::server_id id, const raft::install_snapshot& snap) { return make_ready_future<>(); } + virtual future<> send_append_entries(raft::server_id id, const raft::append_request_send& append_request) { + if (drop_replication && !(rand() % 5)) { + return make_ready_future<>(); + } + raft::append_request_recv req; + req.current_term = append_request.current_term; + req.leader_id = append_request.leader_id; + req.prev_log_idx = append_request.prev_log_idx; + req.prev_log_term = append_request.prev_log_term; + req.leader_commit_idx = append_request.leader_commit_idx; + for (auto&& e: append_request.entries) { + req.entries.push_back(e); + } + net[id]->_client->append_entries(_id, std::move(req)); + //co_return seastar::sleep(1us); + return make_ready_future<>(); + } + virtual future<> send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) { + if (drop_replication && !(rand() % 5)) { + return make_ready_future<>(); + } + net[id]->_client->append_entries_reply(_id, std::move(reply)); + return make_ready_future<>(); + } + virtual future<> send_vote_request(raft::server_id id, const raft::vote_request& vote_request) { + net[id]->_client->request_vote(_id, std::move(vote_request)); + return make_ready_future<>(); + } + virtual future<> send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) { + net[id]->_client->request_vote_reply(_id, std::move(vote_reply)); + return make_ready_future<>(); + } + virtual void add_server(raft::server_id id, bytes node_info) {} + virtual void remove_server(raft::server_id id) {} + virtual future<> abort() { return make_ready_future<>(); } +}; + +class failure_detector : public raft::failure_detector { + bool is_alive(raft::server_id server) override { + return true; + } +}; + +std::unordered_map rpc::net; + +std::pair, state_machine*> +create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, + initial_state state) { + + auto sm = std::make_unique(uuid, std::move(apply)); + auto& rsm = *sm; + auto mrpc = std::make_unique(uuid); + auto mstorage = std::make_unique(state); + auto fd = seastar::make_shared(); + auto raft = raft::create_server(uuid, std::move(mrpc), std::move(sm), std::move(mstorage), + std::move(fd)); + + return std::make_pair(std::move(raft), &rsm); +} + +future, state_machine*>>> create_cluster(std::vector states, state_machine::apply_fn apply) { + raft::configuration config; + std::vector, state_machine*>> rafts; + + for (size_t i = 0; i < states.size(); i++) { + auto uuid = utils::make_random_uuid(); + config.servers.push_back(raft::server_address{uuid}); + } + + for (size_t i = 0; i < states.size(); i++) { + auto& s = config.servers[i]; + states[i].snapshot.config = config; + auto& raft = *rafts.emplace_back(create_raft_server(s.id, apply, states[i])).first; + co_await raft.start(); + } + + co_return std::move(rafts); +} + +struct log_entry { + unsigned term; + int value; +}; + +std::vector create_log(std::initializer_list list, unsigned start_idx = 1) { + std::vector log; + + unsigned i = start_idx; + for (auto e : list) { + raft::command command; + ser::serialize(command, e.value); + log.push_back(raft::log_entry{raft::term_t(e.term), raft::index_t(i++), std::move(command)}); + } + + return log; +} + +constexpr int itr = 100; +std::unordered_map sums; + +future<> apply(raft::server_id id, promise<>& done, const std::vector& commands) { + tlogger.debug("sm::apply got {} entries", commands.size()); + for (auto&& d : commands) { + auto is = ser::as_input_stream(d); + int n = ser::deserialize(is, boost::type()); + tlogger.debug("{}: apply {}", id, n); + auto it = sums.find(id); + if (it == sums.end()) { + sums[id] = 0; + } + sums[id] += n; + } + if (sums[id] == ((itr - 1) * itr)/2) { + done.set_value(); + } + return make_ready_future<>(); +}; + + +future<> test_helper(std::vector states, int start_itr = 0) { + auto rafts = co_await create_cluster(states, apply); + + auto& leader = *rafts[0].first; + leader.make_me_leader(); + + co_await seastar::parallel_for_each(std::views::iota(start_itr, itr), [&] (int i) { + tlogger.debug("Adding entry {} on a leader", i); + raft::command command; + ser::serialize(command, i); + return leader.add_entry(std::move(command), raft::wait_type::committed); + }); + + for (auto& r: rafts) { + co_await r.second->done(); + } + + for (auto& r: rafts) { + co_await r.first->abort(); + } + + sums.clear(); + co_return; +} + +future<> test_simple_replication(size_t size) { + return test_helper(std::vector(size)); +} + +// initially a leader has non empty log +future<> test_replicate_non_empty_leader_log() { + // 2 nodes, leader has entries in his log + std::vector states(2); + states[0].term = raft::term_t(1); + states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}}); + + // start iterations from 4 since o4 entry is already in the log + return test_helper(std::move(states), 4); +} + +// test special case where prev_index = 0 because the leader's log is empty +future<> test_replace_log_leaders_log_empty() { + // current leaders term is 2 and empty log + // one of the follower have three entries that should be replaced + std::vector states(3); + states[0].term = raft::term_t(2); + states[2].log = create_log({{1, 10}, {1, 20}, {1, 30}}); + + return test_helper(std::move(states)); +} + +// two nodes, leader has one entry, follower has 3, existing entries do not match +future<> test_replace_log_leaders_log_not_empty() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(3); + states[0].log = create_log({{1, 0}}); + states[1].log = create_log({{2, 10}, {2, 20}, {2, 30}}); + + // start iterations from 1 since one entry is already in the log + return test_helper(std::move(states), 1); +} + +// two nodes, leader has 2 entries, follower has 4, index=1 matches index=2 does not +future<> test_replace_log_leaders_log_not_empty_2() { + // current leader's term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(3); + states[0].log = create_log({{1, 0}, {1, 1}}); + states[1].log = create_log({{1, 0}, {2, 20}, {2, 30}, {2, 40}}); + + // start iterations from 2 since 2 entries are already in the log + return test_helper(std::move(states), 2); +} + +// a follower and a leader have matching logs but leader's is shorter +future<> test_replace_log_leaders_log_not_empty_3() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(2); + states[0].log = create_log({{1, 0}, {1, 1}}); + states[1].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}}); + + // start iterations from 2 since 2 entries are already in the log + return test_helper(std::move(states), 2); +} + +// a follower and a leader have no common entries +future<> test_replace_no_common_entries() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(3); + states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}, {1, 6}}); + states[1].log = create_log({{2, 10}, {2, 11}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}}); + + // start iterations from 7 since 7 entries are already in the log + return test_helper(std::move(states), 7); +} + +// a follower and a leader have one common entry +future<> test_replace_one_common_entry() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(4); + states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}, {3, 6}}); + states[1].log = create_log({{1, 0}, {2, 11}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}}); + + // start iterations from 7 since 7 entries are already in the log + return test_helper(std::move(states), 7); +} + +// a follower and a leader have t1i common entry in different terms +future<> test_replace_two_common_entry_different_terms() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(5); + states[0].log = create_log({{1, 0}, {2, 1}, {3, 2}, {3, 3}, {3, 4}, {3, 5}, {4, 6}}); + states[1].log = create_log({{1, 0}, {2, 1}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}}); + + // start iterations from 7 since 7 entries are already in the log + return test_helper(std::move(states), 7); +} + +int main(int argc, char* argv[]) { + namespace bpo = boost::program_options; + + seastar::app_template::config cfg; + seastar::app_template app(cfg); + app.add_options() + ("drop-replication", bpo::value()->default_value(false), "drop replication packets randomly"); + + using test_fn = std::function()>; + + test_fn tests[] = { + std::bind(test_simple_replication, 1), + std::bind(test_simple_replication, 2), + test_replicate_non_empty_leader_log, + test_replace_log_leaders_log_empty, + test_replace_log_leaders_log_not_empty, + test_replace_log_leaders_log_not_empty_2, + test_replace_log_leaders_log_not_empty_3, + test_replace_no_common_entries, + test_replace_one_common_entry, + test_replace_two_common_entry_different_terms, + }; + + return app.run(argc, argv, [&tests, &app] () -> future<> { + drop_replication = app.configuration()["drop-replication"].as(); + + int i = 0; + for (auto& t : tests) { + tlogger.debug("test: {}", i++); + co_await t(); + } + }); +} + diff --git a/test/raft/suite.yaml b/test/raft/suite.yaml new file mode 100644 index 0000000000..2e3ea24d79 --- /dev/null +++ b/test/raft/suite.yaml @@ -0,0 +1,5 @@ +# Suite test type. Supported types: unit, boost, cql +type: unit +custom_args: + replication_test: + - '-c1 -m200M'