From c073997431cdd3414539e6b9b0be0735694b72e4 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 13 Aug 2020 18:29:41 +0300 Subject: [PATCH 1/5] raft: Introduce raft interface header This commit introduce public raft interfaces. raft::server represents single raft server instance. raft::state_machine represents a user defined state machine. raft::rpc, raft::rpc_client and raft::storage are used to allow implementing custom networking and storage layers. A shared failure detector interface defines keep-alive semantics, required for efficient implementation of thousands of raft groups. --- raft/internal.hh | 105 ++++++++++++ raft/raft.hh | 414 +++++++++++++++++++++++++++++++++++++++++++++++ raft/server.hh | 117 ++++++++++++++ 3 files changed, 636 insertions(+) create mode 100644 raft/internal.hh create mode 100644 raft/raft.hh create mode 100644 raft/server.hh diff --git a/raft/internal.hh b/raft/internal.hh new file mode 100644 index 0000000000..8e908be5c4 --- /dev/null +++ b/raft/internal.hh @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include +#include "utils/UUID.hh" + +namespace raft { +namespace internal { + +template +class tagged_uint64 { + uint64_t _val; +public: + tagged_uint64() : _val(0) {} + explicit tagged_uint64(uint64_t v) : _val(v) {} + tagged_uint64(const tagged_uint64&) = default; + tagged_uint64(tagged_uint64&&) = default; + tagged_uint64& operator=(const tagged_uint64&) = default; + auto operator<=>(const tagged_uint64&) const = default; + explicit operator bool() const { return _val != 0; } + + operator uint64_t() const { + return _val; + } + tagged_uint64& operator++() { // pre increment + ++_val; + return *this; + } + tagged_uint64 operator++(int) { // post increment + uint64_t v = _val++; + return tagged_uint64(v); + } + tagged_uint64& operator--() { // pre decrement + --_val; + return *this; + } + tagged_uint64 operator--(int) { // post decrement + uint64_t v = _val--; + return tagged_uint64(v); + } + tagged_uint64 operator+(const tagged_uint64& o) const { + return tagged_uint64(_val + o._val); + } + tagged_uint64 operator-(const tagged_uint64& o) const { + return tagged_uint64(_val - o._val); + } + friend std::ostream& operator<<(std::ostream& os, const tagged_uint64& u) { + os << u._val; + return os; + } +}; + +template +struct tagged_id { + utils::UUID id; + bool operator==(const tagged_id& o) const { + return id == o.id; + } + explicit operator bool() const { + // The default constructor sets the id to nil, which is + // guaranteed to not match any valid id. + return id != utils::UUID(); + } +}; + +template +std::ostream& operator<<(std::ostream& os, const tagged_id& id) { + os << id.id; + return os; +} + +} // end of namespace internal +} // end of namespace raft + +namespace std { + +template +struct hash> { + size_t operator()(const raft::internal::tagged_id& id) const { + return hash()(id.id); + } +}; + +} // end of namespace std + diff --git a/raft/raft.hh b/raft/raft.hh new file mode 100644 index 0000000000..587d0394b2 --- /dev/null +++ b/raft/raft.hh @@ -0,0 +1,414 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "bytes_ostream.hh" +#include "utils/UUID.hh" +#include "internal.hh" + +namespace raft { +// Keeps user defined command. A user is responsible to serialize +// a state machine operation into it before passing to raft and +// deserialize in apply() before applying. +using command = bytes_ostream; +using command_cref = std::reference_wrapper; + +extern seastar::logger logger; + +// This is user provided id for a snapshot +using snapshot_id = internal::tagged_id; +// Unique identifier of a server in a Raft group +using server_id = internal::tagged_id; + +// This type represents the raft term +using term_t = internal::tagged_uint64; +// This type represensts the index into the raft log +using index_t = internal::tagged_uint64; + +using clock_type = lowres_clock; + +// Opaque connection properties. May contain ip:port pair for instance. +// This value is disseminated between cluster member +// through regular log replication as part of a configuration +// log entry. Upon receiving it a server passes it down to +// RPC module through add_server() call where it is deserialized +// and used to obtain connection info for the node `id`. After a server +// is added to the RPC module RPC's send functions can be used to communicate +// with it using its `id`. +using server_info = bytes; + +struct server_address { + server_id id; + server_info info; +}; + +struct configuration { + std::vector servers; + + configuration(std::initializer_list ids) { + servers.reserve(ids.size()); + for (auto&& id : ids) { + servers.emplace_back(server_address{std::move(id)}); + } + } + configuration() = default; +}; + +struct log_entry { + // Dummy entry is used when a leader needs to commit an entry + // (after leadership change for instance) but there is nothing + // else to commit. + struct dummy {}; + term_t term; + index_t idx; + std::variant data; +}; + +using log_entry_ptr = seastar::lw_shared_ptr; + +struct error : public std::runtime_error { + using std::runtime_error::runtime_error; +}; + +struct not_a_leader : public error { + server_id leader; + explicit not_a_leader(server_id l) : error("Not a leader"), leader(l) {} +}; + +struct dropped_entry : public error { + dropped_entry() : error("Entry was dropped because of a leader change") {} +}; + +struct stopped_error : public error { + stopped_error() : error("Raft instance is stopped") {} +}; + +struct conf_change_in_progress : public error { + conf_change_in_progress() : error("A configuration change is already in progress") {} +}; + +struct snapshot { + // Index and term of last entry in the snapshot + index_t idx = index_t(0); + term_t term = term_t(0); + // The committed configuration in the snapshot + configuration config; + // Id of the snapshot. + snapshot_id id; +}; + +using log_entry_cref = std::reference_wrapper; + +struct append_request_base { + // The leader's term. + term_t current_term; + // So that follower can redirect clients + // In practice we do not need it since we should know sender's id anyway. + server_id leader_id; + // Index of the log entry immediately preceding new ones + index_t prev_log_idx; + // Term of prev_log_idx entry. + term_t prev_log_term; + // The leader's commit_idx. + index_t leader_commit_idx; +}; + +struct append_request_send : public append_request_base { + // Log entries to store (empty vector for heartbeat; may send more + // than one entry for efficiency). + std::vector entries; +}; +struct append_request_recv : public append_request_base { + // Same as for append_request_send but unlike it here the + // message owns the entries. + std::vector entries; +}; + +struct append_reply { + struct rejected { + // Index of non matching entry that caused the request + // to be rejected. + index_t non_matching_idx; + // Last index in the follower's log, can be used to find next + // matching index more efficiently. + index_t last_idx; + }; + struct accepted { + // Last entry that was appended (may be smaller than max log index + // in case follower's log is longer and appended entries match). + index_t last_new_idx; + }; + // Current term, for leader to update itself. + term_t current_term; + // Contains an index of the last commited entry on the follower + // It is used by a leader to know if a follower is behind and issuing + // empty append entry with updates commit_idx if it is + // Regular RAFT handles this by always sending enoty append requests + // as a hearbeat. + index_t commit_idx; + std::variant result; +}; + +struct vote_request { + // The candidate’s term. + term_t current_term; + // The index of the candidate's last log entry. + index_t last_log_idx; + // The term of the candidate's last log entry. + term_t last_log_term; +}; + +struct vote_reply { + // Current term, for the candidate to update itself. + term_t current_term; + // True means the candidate received a vote. + bool vote_granted; +}; + +struct install_snapshot { + // Current term on a leader + term_t current_term; + // A snapshot to install + snapshot snp; +}; + +struct snapshot_reply { + bool success; +}; + +using rpc_message = std::variant; + +// we need something that can be truncated form both sides. +// std::deque move constructor is not nothrow hence cannot be used +using log_entries = boost::container::deque; + +// rpc, storage and satte_machine classes will have to be implemented by the +// raft user to provide network, persistency and busyness logic support +// repectively. +class rpc; +class storage; + +// Any of the functions may return an error, but it will kill the +// raft instance that uses it. Depending on what state the failure +// leaves the state is the raft instance will either have to be recreated +// with the same state machine and rejoined the cluster with the same server_id +// or it new raft instance will have to be created with empty state machine and +// it will have to rejoin to the cluster with different server_id through +// configuration change. +class state_machine { +public: + virtual ~state_machine() {} + + // This is called after entries are committed (replicated to + // at least quorum of servers). If a provided vector contains + // more than one entry all of them will be committed simultaneously. + // Will be eventually called on all replicas, for all commited commands. + // Raft owns the data since it may be still replicating. + // Raft will not call another apply until the retuned future + // will not become ready. + virtual future<> apply(std::vector command) = 0; + + // The function suppose to take a snapshot of a state machine + // To be called during log compaction or when a leader brings + // a lagging follower up-to-date + virtual future take_snaphot() = 0; + + // The function drops a snapshot with a provided id + virtual void drop_snapshot(snapshot_id id) = 0; + + // reload state machine from a snapshot id + // To be used by a restarting server or by a follower that + // catches up to a leader + virtual future<> load_snapshot(snapshot_id id) = 0; + + // stops the state machine instance by aborting the work + // that can be aborted and waiting for all the rest to complete + // any unfinished apply/snapshot operation may return an error after + // this function is called + virtual future<> abort() = 0; +}; + +class rpc_server; + +// It is safe for for rpc implementation to drop any message. +// Error returned by send function will be ignored. All send_() +// functions can be called concurrently, returned future should be +// waited only for back pressure purposes (unless specified otherwise in +// the function's comment). Values passed by reference may be freed as soon +// as function returns. +class rpc { +protected: + // Pointer to Raft server. Needed for passing RPC messages. + rpc_server* _client = nullptr; +public: + virtual ~rpc() {} + + // Send a snapshot snap to a server server_id. + // A returned future is resolved when snapshot is sent and + // successfully applied by a receiver. Will be waited to + // know if a snaphot transfer succeeded. + virtual future<> send_snapshot(server_id server_id, const install_snapshot& snap) = 0; + + // Send provided append_request to the supplied server, does + // not wait for reply. The returned future resolves when + // message is sent. It does not mean it was received. + virtual future<> send_append_entries(server_id id, const append_request_send& append_request) = 0; + + // Send a reply to an append_request. The returned future + // resolves when message is sent. It does not mean it was + // received. + virtual future<> send_append_entries_reply(server_id id, const append_reply& reply) = 0; + + // Send a vote request. The returned future + // resolves when message is sent. It does not mean it was + // received. + virtual future<> send_vote_request(server_id id, const vote_request& vote_request) = 0; + + // Sends a reply to a vote request. The returned future + // resolves when message is sent. It does not mean it was + // received. + virtual future<> send_vote_reply(server_id id, const vote_reply& vote_reply) = 0; + + // When a new server is learn this function is called with the + // info about the server. + virtual void add_server(server_id id, server_info info) = 0; + + // When a server is removed from local config this call is + // executed. + virtual void remove_server(server_id id) = 0; + + // Stop the RPC instance by aborting the work that can be + // aborted and waiting for all the rest to complete any + // unfinished send operation may return an error after this + // function is called. + virtual future<> abort() = 0; +private: + friend rpc_server; +}; + +// Each Raft server is a receiver of RPC messages. +// Defines the API specific to receiving RPC input. +class rpc_server { +public: + virtual ~rpc_server() {}; + + // This function is called by append_entries RPC + virtual void append_entries(server_id from, append_request_recv append_request) = 0; + + // This function is called by append_entries_reply RPC + virtual void append_entries_reply(server_id from, append_reply reply) = 0; + + // This function is called to handle RequestVote RPC. + virtual void request_vote(server_id from, vote_request vote_request) = 0; + // Handle response to RequestVote RPC + virtual void request_vote_reply(server_id from, vote_reply vote_reply) = 0; + + // Apply incoming snapshot, future resolves when application is complete + virtual future<> apply_snapshot(server_id from, install_snapshot snp) = 0; + + // Update RPC implementation with this client as + // the receiver of RPC input. + void set_rpc_server(class rpc *rpc) { rpc->_client = this; } +}; + +// This class represents persistent storage state. If any of the +// function returns an error the Raft instance will be aborted. +class storage { +public: + virtual ~storage() {} + + // Persist given term and vote. + // Can be called concurrently with other save-* functions in + // the storage and with itself but an implementation has to + // make sure that the result is returned back in the calling order. + virtual future<> store_term_and_vote(term_t term, server_id vote) = 0; + + // Load persisted term and vote. + // Called during Raft server initialization only, is not run + // in parallel with store. + virtual future> load_term_and_vote() = 0; + + // Persist given snapshot and drop all but 'preserve_log_entries' + // entries from the Raft log starting from the beginning. + // This can overwrite a previously persisted snapshot. + // Is called only after the previous invocation completes. + // In other words, it's the caller's responsibility to serialize + // calls to this function. Can be called in parallel with + // store_log_entries() but snap.index should belong to an already + // persisted entry. + virtual future<> store_snapshot(const snapshot& snap, size_t preserve_log_entries) = 0; + + // Load a saved snapshot. + // This only loads it into memory, but does not apply yet. To + // apply call 'state_machine::load_snapshot(snapshot::id)' + // Called during Raft server initialization only, should not + // run in parallel with store. + virtual future load_snapshot() = 0; + + // Persist given log entries. + // Can be called without waiting for previous call to resolve, + // but internally all writes should be serialized into forming + // one contiguous log that holds entries in order of the + // function invocation. + virtual future<> store_log_entries(const std::vector& entries) = 0; + + // Load saved Raft log. Called during Raft server + // initialization only, should not run in parallel with store. + virtual future load_log() = 0; + + // Truncate all entries with an index greater or equal than + // the given index in the log and persist the truncation. Can be + // called in parallel with store_log_entries() but internally + // should be linearized vs store_log_entries(): + // store_log_entries() called after truncate_log() should wait + // for truncation to complete internally before persisting its + // entries. + virtual future<> truncate_log(index_t idx) = 0; + + // Stop the storage instance by aborting the work that can be + // aborted and waiting for all the rest to complete any + // unfinished store/load operation may return an error after + // this function is called. + virtual future<> abort() = 0; +}; + +// To support many Raft groups per server, Seastar Raft +// extends original Raft with a shared failure detector. +// It is used instead of empty AppendEntries PRCs in idle +// cluster. +// This allows multiple Raft groups to share heartbeat traffic. +class failure_detector { +public: + virtual ~failure_detector() {} + // Called by each server on each tick, which defaults to 10 + // per second. Should return true if the server is + // alive. False results may impact liveness. + virtual bool is_alive(server_id server) = 0; +}; + +} // namespace raft + diff --git a/raft/server.hh b/raft/server.hh new file mode 100644 index 0000000000..f72f3834d3 --- /dev/null +++ b/raft/server.hh @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once +#include "raft.hh" + +namespace raft { + +enum class wait_type { + committed, + applied +}; + +// A single uniquely identified participant of a Raft group. +class server { +public: + virtual ~server() {} + // Add command to replicated log + // Returned future is resolved depending on wait_type parameter: + // 'committed' - when the entry is committed + // 'applied' - when the entry is applied (happens after it is committed) + // The function has to be called on a leader, throws not_a_leader exception otherwise. + // May fail because of internal error or because leader changed and an entry was replaced + // by another leader. In the later case dropped_entry exception will be returned. + virtual future<> add_entry(command command, wait_type type) = 0; + + // Add new server to a cluster. If a node is already a member + // of the cluster does nothing. Provided node_info is passed to + // rpc::new_node() on each node in a cluster as it learns + // about joining node. Connection info can be passed there. + // Can be called on a leader only, otherwise throws not_a_leader. + // Cannot be called until previous add/remove server completes + // otherwise conf_change_in_progress exception is thrown. + virtual future<> add_server(server_id id, bytes node_info, clock_type::duration timeout) = 0; + + // Remove a server from the cluster. If the server is not a member + // of the cluster does nothing. Can be called on a leader only + // otherwise throws not_a_leader. + // Cannot be called until previous add/remove server completes + // otherwise conf_change_in_progress exception is thrown. + virtual future<> remove_server(server_id id, clock_type::duration timeout) = 0; + + // Load persisted state and start background work that needs + // to run for this Raft server to function; The object cannot + // be used until the returned future is resolved. + virtual future<> start() = 0; + + // Stop this Raft server, all submitted but not completed + // operations will get an error and callers will not be able + // to know if they succeeded or not. If this server was + // a leader it will relinquish its leadership and cease + // replication. + virtual future<> abort() = 0; + + // Return Raft protocol current term. + virtual term_t get_current_term() const = 0; + + // May be called before attempting a read from the local state + // machine. The read should proceed only after the returned + // future has resolved successfully. + // If called not on a leader throws not_a_leader error. + // After calling this function and resolving the returned + // future: + // + // 1) The result of all completed + // add_entries(wait_type::applied) can be observed by + // direct access to the local state machine. + // 2) A subsequent add_entry() is likely to find this + // server still in the leader role. + // 3) If the caller ensures that writes to the state machine + // are linearised and the current term didn't change + // between read_barrier() and add_entry(), (@sa + // get_current_term()), a pair of read from the state + // machine and add_entry() will be linearised as well. + // + // To sum up, @read_barrier() can be used as a poor man + // distributed Compare-And-Swap: + // + // lock() + // term_t term = get_current_term() + // co_await read_barrier() + // ... Read previous value from the state machine ... + // ... Create a new value ... + // if (term == get_current_term())) { + // co_await add_entry(); + // } + // unlock() + virtual future<> read_barrier() = 0; + + // Ad hoc functions for testing + + virtual void make_me_leader() = 0; +}; + +std::unique_ptr create_server(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector); + +} // namespace raft + From e1ac1a61c926df1978d8ee5df216ef2933751414 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 23 Aug 2020 10:30:39 +0300 Subject: [PATCH 2/5] raft: Implement log replication and leader election This patch introduces partial RAFT implementation. It has only log replication and leader election support. Snapshotting and configuration change along with other, smaller features are not yet implemented. The approach taken by this implementation is to have a deterministic state machine coded in raft::fsm. What makes the FSM deterministic is that it does not do any IO by itself. It only takes an input (which may be a networking message, time tick or new append message), changes its state and produce an output. The output contains the state that has to be persisted, messages that need to be sent and entries that may be applied (in that order). The input and output of the FSM is handled by raft::server class. It uses raft::rpc interface to send and receive messages and raft::storage interface to implement persistence. --- raft/fsm.cc | 554 ++++++++++++++++++++++++++++++++++++++++++ raft/fsm.hh | 385 +++++++++++++++++++++++++++++ raft/log.cc | 171 +++++++++++++ raft/log.hh | 113 +++++++++ raft/logical_clock.hh | 72 ++++++ raft/progress.cc | 130 ++++++++++ raft/progress.hh | 147 +++++++++++ raft/raft.cc | 27 ++ raft/server.cc | 381 +++++++++++++++++++++++++++++ 9 files changed, 1980 insertions(+) create mode 100644 raft/fsm.cc create mode 100644 raft/fsm.hh create mode 100644 raft/log.cc create mode 100644 raft/log.hh create mode 100644 raft/logical_clock.hh create mode 100644 raft/progress.cc create mode 100644 raft/progress.hh create mode 100644 raft/raft.cc create mode 100644 raft/server.cc diff --git a/raft/fsm.cc b/raft/fsm.cc new file mode 100644 index 0000000000..84b4e4b742 --- /dev/null +++ b/raft/fsm.cc @@ -0,0 +1,554 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "fsm.hh" +#include +#include + +namespace raft { + +fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, + failure_detector& failure_detector) : + _my_id(id), _current_term(current_term), _voted_for(voted_for), + _log(std::move(log)), _failure_detector(failure_detector) { + + _observed.advance(*this); + set_configuration(_log.get_snapshot().config); + logger.trace("{}: starting log length {}", _my_id, _log.last_idx()); + + assert(!bool(_current_leader)); +} + +template +const log_entry& fsm::add_entry(T command) { + // It's only possible to add entries on a leader. + check_is_leader(); + + _log.emplace_back(log_entry{_current_term, _log.next_idx(), std::move(command)}); + _sm_events.signal(); + + return *_log[_log.last_idx()]; +} + +template const log_entry& fsm::add_entry(command command); +template const log_entry& fsm::add_entry(log_entry::dummy dummy); + +void fsm::advance_commit_idx(index_t leader_commit_idx) { + + auto new_commit_idx = std::min(leader_commit_idx, _log.stable_idx()); + + logger.trace("advance_commit_idx[{}]: leader_commit_idx={}, new_commit_idx={}", + _my_id, leader_commit_idx, new_commit_idx); + + if (new_commit_idx > _commit_idx) { + _commit_idx = new_commit_idx; + _sm_events.signal(); + logger.trace("advance_commit_idx[{}]: signal apply_entries: committed: {}", + _my_id, _commit_idx); + } +} + + +void fsm::update_current_term(term_t current_term) +{ + assert(_current_term < current_term); + _current_term = current_term; + _voted_for = server_id{}; + + static thread_local std::default_random_engine re{std::random_device{}()}; + static thread_local std::uniform_int_distribution<> dist(1, ELECTION_TIMEOUT.count()); + // Reset the randomized election timeout on each term + // change, even if we do not plan to campaign during this + // term: the main purpose of the timeout is to avoid + // starting our campaign simultaneously with other followers. + _randomized_election_timeout = ELECTION_TIMEOUT + logical_clock::duration{dist(re)}; +} + +void fsm::become_leader() { + assert(!std::holds_alternative(_state)); + assert(!_tracker); + _state = leader{}; + _current_leader = _my_id; + _votes = std::nullopt; + _tracker.emplace(_my_id); + _tracker->set_configuration(_current_config.servers, _log.next_idx()); + _last_election_time = _clock.now(); + replicate(); +} + +void fsm::become_follower(server_id leader) { + _current_leader = leader; + _state = follower{}; + _tracker = std::nullopt; + _votes = std::nullopt; + if (_current_leader) { + _last_election_time = _clock.now(); + } +} + +void fsm::become_candidate() { + _state = candidate{}; + _tracker = std::nullopt; + update_current_term(term_t{_current_term + 1}); + // 3.4 Leader election + // A possible outcome is that a candidate neither wins nor + // loses the election: if many followers become candidates at + // the same time, votes could be split so that no candidate + // obtains a majority. When this happens, each candidate will + // time out and start a new election by incrementing its term + // and initiating another round of RequestVote RPCs. + _last_election_time = _clock.now(); + _votes.emplace(); + _votes->set_configuration(_current_config.servers); + _voted_for = _my_id; + + if (_votes->tally_votes() == vote_result::WON) { + // A single node cluster. + become_leader(); + return; + } + + for (const auto& server : _current_config.servers) { + if (server.id == _my_id) { + continue; + } + logger.trace("{} [term: {}, index: {}, last log term: {}] sent vote request to {}", + _my_id, _current_term, _log.last_idx(), _log.last_term(), server.id); + + send_to(server.id, vote_request{_current_term, _log.last_idx(), _log.last_term()}); + } +} + +future fsm::poll_output() { + logger.trace("fsm::poll_output() {} stable index: {} last index: {}", + _my_id, _log.stable_idx(), _log.last_idx()); + + while (true) { + auto diff = _log.last_idx() - _log.stable_idx(); + + if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this)) { + break; + } + co_await _sm_events.wait(); + } + co_return get_output(); +} + +fsm_output fsm::get_output() { + fsm_output output; + + auto diff = _log.last_idx() - _log.stable_idx(); + + if (diff > 0) { + output.log_entries.reserve(diff); + + for (auto i = _log.stable_idx() + 1; i <= _log.last_idx(); i++) { + // Copy before saving to storage to prevent races with log updates, + // e.g. truncation of the log. + // TODO: avoid copies by making sure log truncate is + // copy-on-write. + output.log_entries.emplace_back(_log[i]); + } + } + + if (_observed._current_term != _current_term || _observed._voted_for != _voted_for) { + output.term = _current_term; + output.vote = _voted_for; + } + + // Return committed entries. + // Observer commit index may be smaller than snapshot index + // in which case we should not attemp commiting entries belonging + // to a snapshot. + auto observed_ci = std::max(_observed._commit_idx, _log.get_snapshot().idx); + if (observed_ci < _commit_idx) { + output.committed.reserve(_commit_idx - observed_ci); + + for (auto idx = observed_ci + 1; idx <= _commit_idx; ++idx) { + const auto& entry = _log[idx]; + if (!std::holds_alternative(entry->data)) { + output.committed.push_back(entry); + } + } + } + + // Get a snapshot of all unsent messages. + // Do it after populting log_entries and committed arrays + // to not lose messages in case arrays population throws + std::swap(output.messages, _messages); + + // Advance the observed state. + _observed.advance(*this); + + // Be careful to do that only after any use of stable_idx() in this + // function and after any code that may throw + if (output.log_entries.size()) { + // We advance stable index before the entries are + // actually persisted, because if writing to stable storage + // will fail the FSM will be stopped and get_output() will + // never be called again, so any new sate that assumes that + // the entries are stable will not be observed. + advance_stable_idx(output.log_entries.back()->idx); + } + + return output; +} + +void fsm::advance_stable_idx(index_t idx) { + _log.stable_to(idx); + if (is_leader()) { + auto& progress = _tracker->find(_my_id); + progress.match_idx = idx; + progress.next_idx = index_t{idx + 1}; + replicate(); + check_committed(); + } +} + +void fsm::check_committed() { + + index_t new_commit_idx = _tracker->committed(_commit_idx); + + if (new_commit_idx <= _commit_idx) { + return; + } + + if (_log[new_commit_idx]->term != _current_term) { + + // 3.6.2 Committing entries from previous terms + // Raft never commits log entries from previous terms by + // counting replicas. Only log entries from the leader’s + // current term are committed by counting replicas; once + // an entry from the current term has been committed in + // this way, then all prior entries are committed + // indirectly because of the Log Matching Property. + logger.trace("check_committed[{}]: cannot commit because of term {} != {}", + _my_id, _log[new_commit_idx]->term, _current_term); + return; + } + logger.trace("check_committed[{}]: commit {}", _my_id, new_commit_idx); + _commit_idx = new_commit_idx; + // We have a quorum of servers with match_idx greater than the + // current commit index. Commit && apply more entries. + _sm_events.signal(); +} + +void fsm::tick_leader() { + if (_clock.now() - _last_election_time >= ELECTION_TIMEOUT) { + // 6.2 Routing requests to the leader + // A leader in Raft steps down if an election timeout + // elapses without a successful round of heartbeats to a majority + // of its cluster; this allows clients to retry their requests + // with another server. + return become_follower(server_id{}); + } + + size_t active = 1; // +1 for self + for (auto& [id, progress] : *_tracker) { + if (progress.id != _my_id) { + if (_failure_detector.is_alive(progress.id)) { + active++; + } + if (progress.state == follower_progress::state::PIPELINE && + progress.in_flight == follower_progress::max_in_flight) { + + progress.in_flight--; // allow one more packet to be sent + } + if (progress.match_idx < _log.stable_idx() || progress.commit_idx < _commit_idx) { + logger.trace("tick[{}]: replicate to {} because match={} < stable={} || " + "follower commit_idx={} < commit_idx={}", + _my_id, progress.id, progress.match_idx, _log.stable_idx(), + progress.commit_idx, _commit_idx); + + replicate_to(progress, true); + } + } + } + if (active >= _tracker->size()/2 + 1) { + // Advance last election time if we heard from + // the quorum during this tick. + _last_election_time = _clock.now(); + } +} + +void fsm::tick() { + _clock.advance(); + + if (is_leader()) { + tick_leader(); + } else if (_current_leader && _failure_detector.is_alive(_current_leader)) { + // Ensure the follower doesn't disrupt a valid leader + // simple because there were no AppendEntries RPCs recently. + _last_election_time = _clock.now(); + } else if (is_past_election_timeout()) { + logger.trace("tick[{}]: becoming a candidate, last election: {}, now: {}", _my_id, + _last_election_time, _clock.now()); + become_candidate(); + } +} + +void fsm::append_entries(server_id from, append_request_recv&& request) { + logger.trace("append_entries[{}] received ct={}, prev idx={} prev term={} commit idx={}, idx={}", + _my_id, request.current_term, request.prev_log_idx, request.prev_log_term, + request.leader_commit_idx, request.entries.size() ? request.entries[0].idx : index_t(0)); + + assert(is_follower()); + // 3.4. Leader election + // A server remains in follower state as long as it receives + // valid RPCs from a leader. + _last_election_time = _clock.now(); + + // Ensure log matching property, even if we append no entries. + // 3.5 + // Until the leader has discovered where it and the + // follower’s logs match, the leader can send + // AppendEntries with no entries (like heartbeats) to save + // bandwidth. + auto [match, term] = _log.match_term(request.prev_log_idx, request.prev_log_term); + if (!match) { + logger.trace("append_entries[{}]: no matching term at position {}: expected {}, found {}", + _my_id, request.prev_log_idx, request.prev_log_term, term); + // Reply false if log doesn't contain an entry at + // prevLogIndex whose term matches prevLogTerm (§5.3). + send_to(from, append_reply{_current_term, _commit_idx, append_reply::rejected{request.prev_log_idx, _log.last_idx()}}); + return; + } + + // If there are no entries it means that the leader wants + // to ensure forward progress. Reply with the last index + // that matches. + index_t last_new_idx = request.prev_log_idx; + + if (!request.entries.empty()) { + last_new_idx = _log.maybe_append(std::move(request.entries)); + } + + advance_commit_idx(request.leader_commit_idx); + + send_to(from, append_reply{_current_term, _commit_idx, append_reply::accepted{last_new_idx}}); +} + +void fsm::append_entries_reply(server_id from, append_reply&& reply) { + assert(is_leader()); + + follower_progress& progress = _tracker->find(from); + + if (progress.state == follower_progress::state::PIPELINE) { + if (progress.in_flight) { + // in_flight is not precise, so do not let it underflow + progress.in_flight--; + } + } + + progress.commit_idx = reply.commit_idx; + + if (std::holds_alternative(reply.result)) { + // accepted + index_t last_idx = std::get(reply.result).last_new_idx; + + logger.trace("append_entries_reply[{}->{}]: accepted match={} last index={}", + _my_id, from, progress.match_idx, last_idx); + + progress.match_idx = std::max(progress.match_idx, last_idx); + // out next_idx may be large because of optimistic increase in pipeline mode + progress.next_idx = std::max(progress.next_idx, index_t(last_idx + 1)); + + progress.become_pipeline(); + + // check if any new entry can be committed + check_committed(); + } else { + // rejected + append_reply::rejected rejected = std::get(reply.result); + + logger.trace("append_entries_reply[{}->{}]: rejected match={} index={}", + _my_id, from, progress.match_idx, rejected.non_matching_idx); + + // check reply validity + if (progress.is_stray_reject(rejected)) { + logger.trace("append_entries_reply[{}->{}]: drop stray append reject", _my_id, from); + return; + } + + // Start re-sending from the non matching index, or from + // the last index in the follower's log. + // FIXME: make it more efficient + progress.next_idx = std::min(rejected.non_matching_idx, index_t(rejected.last_idx + 1)); + + progress.become_probe(); + + // We should not fail to apply an entry following the matched one. + assert(progress.next_idx != progress.match_idx); + } + + logger.trace("append_entries_reply[{}->{}]: next_idx={}, match_idx={}", + _my_id, from, progress.next_idx, progress.match_idx); + + replicate_to(progress, false); +} + +void fsm::request_vote(server_id from, vote_request&& request) { + + // We can cast a vote in any state. If the candidate's term is + // lower than ours, we ignore the request. Otherwise we first + // update our current term and convert to a follower. + assert(_current_term == request.current_term); + + bool can_vote = + // We can vote if this is a repeat of a vote we've already cast... + _voted_for == from || + // ...we haven't voted and we don't think there's a leader yet in this term... + (_voted_for == server_id{} && _current_leader == server_id{}); + + // ...and we believe the candidate is up to date. + if (can_vote && _log.is_up_to_date(request.last_log_idx, request.last_log_term)) { + + logger.trace("{} [term: {}, index: {}, log_term: {}, voted_for: {}] " + "voted for {} [log_term: {}, log_index: {}]", + _my_id, _current_term, _log.last_idx(), _log.last_term(), _voted_for, + from, request.last_log_term, request.last_log_idx); + + _voted_for = from; + + send_to(from, vote_reply{_current_term, true}); + } else { + logger.trace("{} [term: {}, index: {}, log_term: {}, voted_for: {}] " + "rejected vote for {} [log_term: {}, log_index: {}]", + _my_id, _current_term, _log.last_idx(), _log.last_term(), _voted_for, + from, request.last_log_term, request.last_log_idx); + + send_to(from, vote_reply{_current_term, false}); + } +} + +void fsm::request_vote_reply(server_id from, vote_reply&& reply) { + assert(is_candidate()); + + logger.trace("{} received a {} vote from {}", _my_id, reply.vote_granted ? "yes" : "no", from); + + _votes->register_vote(from, reply.vote_granted); + + switch (_votes->tally_votes()) { + case vote_result::UNKNOWN: + break; + case vote_result::WON: + become_leader(); + break; + case vote_result::LOST: + become_follower(server_id{}); + break; + } +} + +void fsm::replicate_to(follower_progress& progress, bool allow_empty) { + + logger.trace("replicate_to[{}->{}]: called next={} match={}", + _my_id, progress.id, progress.next_idx, progress.match_idx); + + while (progress.can_send_to(_clock.now())) { + index_t next_idx = progress.next_idx; + if (progress.next_idx > _log.stable_idx()) { + next_idx = index_t(0); + logger.trace("replicate_to[{}->{}]: next past stable next={} stable={}, empty={}", + _my_id, progress.id, progress.next_idx, _log.stable_idx(), allow_empty); + if (!allow_empty) { + // Send out only persisted entries. + return; + } + } + + allow_empty = false; // allow only one empty message + + index_t prev_idx = index_t(0); + term_t prev_term = _current_term; + if (progress.next_idx != 1) { + auto& s = _log.get_snapshot(); + prev_idx = index_t(progress.next_idx - 1); + assert (prev_idx >= s.idx); + prev_term = s.idx == prev_idx ? s.term : _log[prev_idx]->term; + } + + append_request_send req = {{ + .current_term = _current_term, + .leader_id = _my_id, + .prev_log_idx = prev_idx, + .prev_log_term = prev_term, + .leader_commit_idx = _commit_idx + }, + std::vector() + }; + + if (next_idx) { + const log_entry& entry = *_log[next_idx]; + // TODO: send only one entry for now, but we should batch in the future + req.entries.push_back(std::cref(entry)); + logger.trace("replicate_to[{}->{}]: send entry idx={}, term={}", + _my_id, progress.id, entry.idx, entry.term); + + if (progress.state == follower_progress::state::PIPELINE) { + progress.in_flight++; + // Optimistically update next send index. In case + // a message is lost there will be negative reply that + // will re-send idx. + progress.next_idx++; + } + } else { + logger.trace("replicate_to[{}->{}]: send empty", _my_id, progress.id); + } + + send_to(progress.id, std::move(req)); + + progress.last_append_time = _clock.now(); + } +} + +void fsm::replicate() { + assert(is_leader()); + for (auto& [id, progress] : *_tracker) { + if (progress.id != _my_id) { + replicate_to(progress, false); + } + } +} + +bool fsm::can_read() { + check_is_leader(); + + if (_log[_log.last_idx()]->term != _current_term) { + return false; + } + + // TODO: for now always return false to let the caller know that + // applying dummy entry is needed before reading (to confirm the leadership), + // but in the future we may return true here if we can guaranty leadership + // by means of a "stable leader" optimization. "Stable leader" ensures that + // a follower does not vote for other leader if it recently (during a couple + // of last ticks) heard from existing one, so if the leader is already committed + // entries during this tick it guaranties that it communicated with + // majority of nodes and no other leader could have been elected. + + return false; +} + +void fsm::stop() { + _sm_events.broken(); +} + +} // end of namespace raft diff --git a/raft/fsm.hh b/raft/fsm.hh new file mode 100644 index 0000000000..fa1f1f0b66 --- /dev/null +++ b/raft/fsm.hh @@ -0,0 +1,385 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include "raft.hh" +#include "progress.hh" +#include "log.hh" + +namespace raft { + +// State of the FSM that needs logging & sending. +struct fsm_output { + term_t term; + server_id vote; + std::vector log_entries; + std::vector> messages; + // Entries to apply. + std::vector committed; +}; + +// 3.4 Leader election +// If a follower receives no communication over a period of +// time called the election timeout, then it assumes there is +// no viable leader and begins an election to choose a new +// leader. +static constexpr logical_clock::duration ELECTION_TIMEOUT = logical_clock::duration{10}; + +// 3.3 Raft Basics +// At any given time each server is in one of three states: +// leader, follower, or candidate. +// In normal operation there is exactly one leader and all of the +// other servers are followers. Followers are passive: they issue +// no requests on their own but simply respond to requests from +// leaders and candidates. The leader handles all client requests +// (if a client contacts a follower, the follower redirects it to +// the leader). The third state, candidate, is used to elect a new +// leader. +class follower {}; +class candidate {}; +class leader {}; + +// Raft protocol finite state machine +// +// Most libraries separate themselves from implementations by +// providing an API to the environment of the Raft protocol, such +// as the database, the write ahead log and the RPC to peers. + +// This callback based design has some drawbacks: + +// - some callbacks may be defined in blocking model; e.g. +// writing log entries to disk, or persisting the current +// term in the database; Seastar has no blocking IO and +// would have to emulate it with fibers; +// - the API calls are spread over the state machine +// implementation, which makes reasoning about the correctness +// more difficult (what happens if the library is is accessed +// concurrently by multiple users, which of these accesses have +// to be synchronized; what if the callback fails, is the state +// machine handling the error correctly?) +// - while using callbacks allow testing without a real network or disk, +// it still complicates it, since one has to implement meaningful +// mocks for most of the APIs. +// +// Seastar Raft instead implements an instance of Raft as +// in-memory state machine with a catch-all API step(message) +// method. The method handles any kind of input and performs the +// needed state machine state transitions. To get state machine output +// poll_output() function has to be called. This call produces an output +// object, which encapsulates a list of actions that must be +// performed until the next poll_output() call can be made. The time is +// represented with a logical timer. The client is responsible for +// periodically invoking tick() method, which advances the state +// machine time and allows it to track such events as election or +// heartbeat timeouts. +class fsm { + // id of this node + server_id _my_id; + // id of the current leader + server_id _current_leader; + // What state the server is in. The default is follower. + std::variant _state; + // _current_term, _voted_for && _log are persisted in storage + // The latest term the server has seen. + term_t _current_term; + // Candidate id that received a vote in the current term (or + // nil if none). + server_id _voted_for; + // Index of the highest log entry known to be committed. + // Currently not persisted. + index_t _commit_idx = index_t(0); + // Log entries; each entry contains a command for state machine, + // and the term when the entry was received by the leader. + log _log; + // A possibly shared server failure detector. + failure_detector& _failure_detector; + + // Stores the last state observed by get_output(). + // Is updated with the actual state of the FSM after + // fsm_output is created. + struct last_observed_state { + term_t _current_term; + server_id _voted_for; + index_t _commit_idx; + + bool is_equal(const fsm& fsm) const { + return _current_term == fsm._current_term && _voted_for == fsm._voted_for && + _commit_idx == fsm._commit_idx; + } + + void advance(const fsm& fsm) { + _current_term = fsm._current_term; + _voted_for = fsm._voted_for; + _commit_idx = fsm._commit_idx; + } + } _observed; + + logical_clock _clock; + // Start of the current election epoch - a time point relative + // to which we expire election timeout. + logical_clock::time_point _last_election_time = logical_clock::min(); + // A random value in range [election_timeout, 2 * election_timeout), + // reset on each term change. + logical_clock::duration _randomized_election_timeout = ELECTION_TIMEOUT; + // Votes received during an election round. Available only in + // candidate state. + std::optional _votes; + + // A state for each follower, maintained only on the leader. + std::optional _tracker; + // Holds all replies to AppendEntries RPC which are not + // yet sent out. If AppendEntries request is accepted, we must + // withhold a reply until the respective entry is persisted in + // the log. Otherwise, e.g. when we receive AppendEntries with + // an older term, we may reject it immediately. + // Either way all replies are appended to this queue first. + // + // 3.3 Raft Basics + // If a server receives a request with a stale term number, it + // rejects the request. + // TLA+ line 328 + std::vector> _messages; + + // Currently used configuration, may be different from + // the committed during a configuration change. + configuration _current_config; + + // Signaled when there is a IO event to process. + seastar::condition_variable _sm_events; + // Called when one of the replicas advances its match index + // so it may be the case that some entries are committed now. + // Signals _sm_events. + void check_committed(); + // Check if the randomized election timeout has expired. + bool is_past_election_timeout() const { + return _clock.now() - _last_election_time >= _randomized_election_timeout; + } + // How much time has passed since last election or last + // time we heard from a valid leader. + logical_clock::duration election_elapsed() const { + return _clock.now() - _last_election_time; + } + + // A helper to send any kind of RPC message. + template + void send_to(server_id to, Message&& m) { + static_assert(std::is_rvalue_reference::value, "must be rvalue"); + _messages.push_back(std::make_pair(to, std::move(m))); + _sm_events.signal(); + } + + // A helper to update the FSM's current term. + void update_current_term(term_t current_term); + + void check_is_leader() const { + if (!is_leader()) { + throw not_a_leader(_current_leader); + } + } + + void become_candidate(); + + void become_follower(server_id leader); + + // Controls whether the follower has been responsive recently, + // so it makes sense to send more data to it. + bool can_send_to(const follower_progress& progress); + // Replicate entries to a follower. If there are no entries to send + // and allow_empty is true, send a heartbeat. + void replicate_to(follower_progress& progress, bool allow_empty); + void replicate(); + void append_entries(server_id from, append_request_recv&& append_request); + void append_entries_reply(server_id from, append_reply&& reply); + + void request_vote(server_id from, vote_request&& vote_request); + void request_vote_reply(server_id from, vote_reply&& vote_reply); + + // Called on a follower with a new known leader commit index. + // Advances the follower's commit index up to all log-stable + // entries, known to be committed. + void advance_commit_idx(index_t leader_commit_idx); + // Called after log entries in FSM output are considered persisted. + // Produces new FSM output. + void advance_stable_idx(index_t idx); + // Tick implementation on a leader + void tick_leader(); + + // Set cluster configuration + void set_configuration(const configuration& config) { + _current_config = config; + // We unconditionally access _current_config + // to identify which entries are committed. + assert(_current_config.servers.size() > 0); + if (is_leader()) { + _tracker->set_configuration(_current_config.servers, _log.next_idx()); + } else if (is_candidate()) { + _votes->set_configuration(_current_config.servers); + } + } +public: + explicit fsm(server_id id, term_t current_term, server_id voted_for, log log, + failure_detector& failure_detector); + + bool is_leader() const { + return std::holds_alternative(_state); + } + bool is_follower() const { + return std::holds_alternative(_state); + } + bool is_candidate() const { + return std::holds_alternative(_state); + } + + void become_leader(); + + // Add an entry to in-memory log. The entry has to be + // committed to the persistent Raft log afterwards. + template const log_entry& add_entry(T command); + + // Wait until there is, and return state machine output that + // needs to be handled. + // This includes a list of the entries that need + // to be logged. The logged entries are eventually + // discarded from the state machine after snapshotting. + future poll_output(); + + // Get state machine output, if there is any. Doesn't + // wait. It is public for use in testing. + // May throw on allocation failure, but leaves state machine + // in the same state in that case + fsm_output get_output(); + + // Called to advance virtual clock of the protocol state machine. + void tick(); + + // Feed one Raft RPC message into the state machine. + // Advances the state machine state and generates output, + // accessible via poll_output(). + template + void step(server_id from, Message&& msg); + + void stop(); + + // @sa can_read() + term_t get_current_term() const { + return _current_term; + } + + // Should be called on leader only, throws otherwise. + // Returns true if the current leader has at least one entry + // committed and a quorum of followers was alive in the last + // tick period. + bool can_read(); + + friend std::ostream& operator<<(std::ostream& os, const fsm& f); +}; + +template +void fsm::step(server_id from, Message&& msg) { + static_assert(std::is_rvalue_reference::value, "must be rvalue"); + // 4.1. Safety + // Servers process incoming RPC requests without consulting + // their current configurations. + + // 3.3. Raft basics. + // + // Current terms are exchanged whenever servers + // communicate; if one server’s current term is smaller + // than the other’s, then it updates its current term to + // the larger value. If a candidate or leader discovers + // that its term is out of date, it immediately reverts to + // follower state. If a server receives a request with + // a stale term number, it rejects the request. + if (msg.current_term > _current_term) { + logger.trace("{} [term: {}] received a message with higher term from {} [term: {}]", + _my_id, _current_term, from, msg.current_term); + + if constexpr (std::is_same_v) { + become_follower(from); + } else { + if constexpr (std::is_same_v) { + if (_current_leader != server_id{} && election_elapsed() < ELECTION_TIMEOUT) { + // 4.2.3 Disruptive servers + // If a server receives a RequestVote request + // within the minimum election timeout of + // hearing from a current leader, it does not + // update its term or grant its vote. + logger.trace("{} [term: {}] not granting a vote within a minimum election timeout, elapsed {}", + _my_id, _current_term, election_elapsed()); + return; + } + } + become_follower(server_id{}); + } + update_current_term(msg.current_term); + + } else if (msg.current_term < _current_term) { + if constexpr (std::is_same_v) { + // Instructs the leader to step down. + append_reply reply{_current_term, _commit_idx, append_reply::rejected{msg.prev_log_idx, _log.last_idx()}}; + send_to(from, std::move(reply)); + } else { + // Ignore other cases + logger.trace("{} [term: {}] ignored a message with lower term from {} [term: {}]", + _my_id, _current_term, from, msg.current_term); + } + return; + } + + auto visitor = [this, from, msg = std::move(msg)](auto state) mutable { + using State = decltype(state); + + if constexpr (std::is_same_v) { + // Got AppendEntries RPC from self + assert((!std::is_same_v)); + // 3.4 Leader Election + // While waiting for votes, a candidate may receive an AppendEntries + // RPC from another server claiming to be leader. If the + // leader’s term (included in its RPC) is at least as large as the + // candidate’s current term, then the candidate recognizes the + // leader as legitimate and returns to follower state. + if constexpr (std::is_same_v) { + become_follower(from); + } + append_entries(from, std::move(msg)); + } else if constexpr (std::is_same_v) { + if constexpr (!std::is_same_v) { + // Ignore stray reply if we're not a leader. + return; + } + append_entries_reply(from, std::move(msg)); + } else if constexpr (std::is_same_v) { + request_vote(from, std::move(msg)); + } else if constexpr (std::is_same_v) { + if constexpr (!std::is_same_v) { + // Ignore stray reply if we're not a candidate. + return; + } + request_vote_reply(from, std::move(msg)); + } + }; + + std::visit(visitor, _state); +} + +} // namespace raft + diff --git a/raft/log.cc b/raft/log.cc new file mode 100644 index 0000000000..cdd5767beb --- /dev/null +++ b/raft/log.cc @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "log.hh" + +namespace raft { + +log_entry_ptr& log::get_entry(index_t i) { + return _log[i - start_idx()]; +} + +log_entry_ptr& log::operator[](size_t i) { + assert(index_t(i) >= start_idx()); + return get_entry(index_t(i)); +} + +void log::emplace_back(log_entry&& e) { + _log.emplace_back(seastar::make_lw_shared(std::move(e))); +} + +bool log::empty() const { + return _log.empty(); +} + +bool log::is_up_to_date(index_t idx, term_t term) const { + // 3.6.1 Election restriction + // Raft determines which of two logs is more up-to-date by comparing the + // index and term of the last entries in the logs. If the logs have last + // entries with different terms, then the log with the later term is more + // up-to-date. If the logs end with the same term, then whichever log is + // longer is more up-to-date. + return term > last_term() || (term == last_term() && idx >= last_idx()); +} + +index_t log::last_idx() const { + return index_t(_log.size()) + start_idx() - index_t(1); +} + +index_t log::next_idx() const { + return last_idx() + index_t(1); +} + +void log::truncate_head(index_t idx) { + assert(idx >= start_idx()); + auto it = _log.begin() + (idx - start_idx()); + _log.erase(it, _log.end()); + stable_to(std::min(_stable_idx, last_idx())); +} + +void log::truncate_tail(index_t idx) { + assert(start_idx() <= idx); + + if (idx >= last_idx()) { + _log.clear(); + } else if (idx > start_idx()) { + _log.erase(_log.begin(), _log.begin() + idx - start_idx() + 1); + } + + _stable_idx = std::max(idx, _stable_idx); +} + +index_t log::start_idx() const { + return _snapshot.idx + index_t(1); +} + +term_t log::last_term() const { + if (_log.empty()) { + return term_t(0); + } + return _log.back()->term; +} + +void log::stable_to(index_t idx) { + assert(idx <= last_idx()); + _stable_idx = idx; +} + +std::pair log::match_term(index_t idx, term_t term) const { + if (idx == 0) { + // Special case of empty log on leader, + // TLA+ line 324. + return std::make_pair(true, term_t(0)); + } + + // idx cannot point into the snapshot + assert(idx >= _snapshot.idx); + + term_t my_term; + + if (idx == _snapshot.idx) { + my_term = _snapshot.term; + } else { + auto i = idx - start_idx(); + + if (i >= _log.size()) { + // We have a gap between the follower and the leader. + return std::make_pair(false, term_t(0)); + } + + my_term = _log[i]->term; + } + + return my_term == term ? std::make_pair(true, term_t(0)) : std::make_pair(false, my_term); +} + +index_t log::maybe_append(std::vector&& entries) { + assert(!entries.empty()); + + index_t last_new_idx = entries.back().idx; + + // We must scan through all entries if the log already + // contains them to ensure the terms match. + for (auto& e : entries) { + if (e.idx <= last_idx()) { + if (e.idx < start_idx()) { + logger.trace("append_entries: skipping entry with idx {} less than log start {}", + e.idx, start_idx()); + continue; + } + if (e.term == get_entry(e.idx)->term) { + logger.trace("append_entries: entries with index {} has matching terms {}", e.idx, e.term); + continue; + } + logger.trace("append_entries: entries with index {} has non matching terms e.term={}, _log[i].term = {}", + e.idx, e.term, get_entry(e.idx)->term); + // If an existing entry conflicts with a new one (same + // index but different terms), delete the existing + // entry and all that follow it (§5.3). + truncate_head(e.idx); + } + // Assert log monotonicity + assert(e.idx == next_idx()); + _log.emplace_back(seastar::make_lw_shared(std::move(e))); + } + + return last_new_idx; +} + +void log::apply_snapshot(snapshot&& snp) { + // call truncate first since it uses old snapshot + truncate_tail(snp.idx); + _snapshot = std::move(snp); +} + +std::ostream& operator<<(std::ostream& os, const log& l) { + os << "next idx: " << l.next_idx() << ", "; + os << "last idx: " << l.last_idx() << ", "; + os << "stable idx: " << l.stable_idx() << ", "; + os << "start idx: " << l.start_idx() << ", "; + os << "last term: " << l.last_term(); + return os; +} + +} // end of namespace raft diff --git a/raft/log.hh b/raft/log.hh new file mode 100644 index 0000000000..1aad879ce5 --- /dev/null +++ b/raft/log.hh @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include "raft.hh" + +namespace raft { +// This class represents the Raft log in memory. +// +// The value of the first index is 1. +// New entries are added at the back. +// +// Entries are persisted locally after they are added. Entries may be +// dropped from the beginning by snapshotting and from the end by +// a new leader replacing stale entries. Any exception thrown by +// any function leaves the log in a consistent state. +class log { + // Snapshot of the prefix of the log. + snapshot _snapshot; + // We need something that can be truncated from both sides. + // std::deque move constructor is not nothrow hence cannot be used + log_entries _log; + // Index of the last stable (persisted) entry in the log. + index_t _stable_idx = index_t(0); + +private: + void truncate_head(index_t i); + void truncate_tail(index_t i); + log_entry_ptr& get_entry(index_t); + index_t start_idx() const; +public: + log() = default ; + explicit log(log_entries log) : _log(std::move(log)) { stable_to(index_t(_log.size())); }; + log(snapshot snp, log_entries log) : _snapshot(std::move(snp)), _log(std::move(log)) { stable_to(last_idx()); } + explicit log(snapshot snp) : _snapshot(std::move(snp)) {} + // The index here the global raft log index, not related to a snapshot. + // It is a programming error to call the function with an index that points into the snapshot, + // the function will abort() + log_entry_ptr& operator[](size_t i); + // Add an entry to the log. + void emplace_back(log_entry&& e); + // Mark all entries up to this index as stable. + void stable_to(index_t idx); + // Return true if in memory log is empty. + bool empty() const; + // 3.6.1 Election restriction. + // The voter denies its vote if its own log is more up-to-date + // than that of the candidate. + bool is_up_to_date(index_t idx, term_t term) const; + index_t next_idx() const; + index_t last_idx() const; + index_t stable_idx() const { + return _stable_idx; + } + term_t last_term() const; + + // The function returns current snapshot state of the log + const snapshot& get_snapshot() const { + return _snapshot; + } + + void apply_snapshot(snapshot&& snp); + + // 3.5 + // Raft maintains the following properties, which + // together constitute the Log Matching Property: + // * If two entries in different logs have the same index and + // term, then they store the same command. + // * If two entries in different logs have the same index and + // term, then the logs are identical in all preceding entries. + // + // The first property follows from the fact that a leader + // creates at most one entry with a given log index in a given + // term, and log entries never change their position in the + // log. The second property is guaranteed by a consistency + // check performed by AppendEntries. When sending an + // AppendEntries RPC, the leader includes the index and term + // of the entry in its log that immediately precedes the new + // entries. If the follower does not find an entry in its log + // with the same index and term, then it refuses the new + // entries. + // + // @retval first is true - there is a match, term value is irrelevant + // @retval first is false - the follower's log doesn't match the leader's + // and non matching term is in second + std::pair match_term(index_t idx, term_t term) const; + + // Called on a follower to append entries from a leader. + // @retval return an index of last appended entry + index_t maybe_append(std::vector&& entries); + + friend std::ostream& operator<<(std::ostream& os, const log& l); +}; + +} diff --git a/raft/logical_clock.hh b/raft/logical_clock.hh new file mode 100644 index 0000000000..2a6fc79a46 --- /dev/null +++ b/raft/logical_clock.hh @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include + +namespace raft { + +// Raft protocol state machine clock ticks at different speeds +// depending on the environment. A typical clock tick for +// a production system is 100ms, while a test system can +// tick it at the speed of the hardware clock. +// +// Every state machine has an own instance of logical clock, +// this enables tests when different state machines run at +// different clock speeds. +class logical_clock final { +public: + using rep = int64_t; + // There is no realistic period for a logical clock, + // just use the smallest period possible. + using period = std::chrono::nanoseconds::period; + using duration = std::chrono::duration; + using time_point = std::chrono::time_point; + + static constexpr bool is_steady = true; + + void advance(duration diff = duration{1}) { + _now += diff; + } + time_point now() const noexcept { + return _now; + } + + static constexpr time_point min() { + return time_point(duration{0}); + } +private: + time_point _now = min(); +}; + +inline std::ostream& operator<<(std::ostream& os, const logical_clock::time_point& p) { + return os << (p - logical_clock::min()).count(); +} + +} // end of namespace raft + +namespace std { + +inline std::ostream& operator<<(std::ostream& os, const raft::logical_clock::duration& d) { + return os << d.count(); +} + +} // end of namespace std diff --git a/raft/progress.cc b/raft/progress.cc new file mode 100644 index 0000000000..3ee4fd7801 --- /dev/null +++ b/raft/progress.cc @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "progress.hh" +#include + +namespace raft { + +bool follower_progress::is_stray_reject(const append_reply::rejected& rejected) { + switch (state) { + case follower_progress::state::PIPELINE: + if (rejected.non_matching_idx <= match_idx) { + // If rejected index is smaller that matched it means this is a stray reply + return true; + } + break; + case follower_progress::state::PROBE: + // In the probe state the reply is only valid if it matches next_idx - 1, since only + // one append request is outstanding. + if (rejected.non_matching_idx != index_t(next_idx - 1)) { + return true; + } + break; + case follower_progress::state::SNAPSHOT: + // any reject during snapshot transfer is stray one + return true; + default: + assert(false); + } + return false; +} + +void follower_progress::become_probe() { + state = state::PROBE; +} + +void follower_progress::become_pipeline() { + if (state != state::PIPELINE) { + // If a previous request was accepted, move to "pipeline" state + // since we now know the follower's log state. + state = state::PIPELINE; + in_flight = 0; + } +} + +void follower_progress::become_snapshot() { + state = state::SNAPSHOT; +} + +bool follower_progress::can_send_to(logical_clock::time_point now) { + switch (state) { + case state::PROBE: + return now - last_append_time >= logical_clock::duration{1}; + case state::PIPELINE: + // allow `max_in_flight` outstanding indexes + // FIXME: make it smarter + return in_flight < follower_progress::max_in_flight; + case state::SNAPSHOT: + // In this state we are waiting + // for a snapshot to be transferred + // before starting to sync the log. + return false; + } + assert(false); + return false; +} + +void tracker::set_configuration(const std::vector& servers, index_t next_idx) { + for (auto& s : servers) { + if (this->progress::find(s.id) != this->progress::end()) { + continue; + } + this->progress::emplace(s.id, follower_progress{s.id, next_idx}); + } +} + +index_t tracker::committed(index_t prev_commit_idx) { + std::vector match; + size_t count = 0; + + for (const auto& [id, p] : *this) { + logger.trace("committed {}: {} {}", p.id, p.match_idx, prev_commit_idx); + if (p.match_idx > prev_commit_idx) { + count++; + } + match.push_back(p.match_idx); + } + logger.trace("check committed count {} cluster size {}", count, match.size()); + if (count < match.size()/2 + 1) { + return prev_commit_idx; + } + // The index of the pivot node is selected so that all nodes + // with a larger match index plus the pivot form a majority, + // for example: + // cluster size pivot node majority + // 1 0 1 + // 2 0 2 + // 3 1 2 + // 4 1 3 + // 5 2 3 + // + auto pivot = (match.size() - 1) / 2; + std::nth_element(match.begin(), match.begin() + pivot, match.end()); + return match[pivot]; +} + +std::ostream& operator<<(std::ostream& os, const votes& v) { + os << "responded: " << v._responded << ", "; + os << "granted: " << v._granted; + return os; +} + +} // end of namespace raft diff --git a/raft/progress.hh b/raft/progress.hh new file mode 100644 index 0000000000..33bb09141f --- /dev/null +++ b/raft/progress.hh @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#pragma once + +#include +#include "raft.hh" +#include "logical_clock.hh" + +namespace raft { + +// Leader's view of each follower, including self. +class follower_progress { +public: + // Id of this server + server_id id; + // Index of the next log entry to send to this server. + index_t next_idx; + // Index of the highest log entry known to be replicated to this + // server. + index_t match_idx = index_t(0); + // Index that we know to be committed by the follower + index_t commit_idx = index_t(0); + + enum class state { + // In this state only one append entry is send until matching index is found + PROBE, + // In this state multiple append entries are sent optimistically + PIPELINE, + // In this state snapshot is been transfered + SNAPSHOT + }; + state state = state::PROBE; + // number of in flight still un-acked append entries requests + size_t in_flight = 0; + static constexpr size_t max_in_flight = 10; + + // Set when a message is sent to the follower. + // Used to decide if a separate keep alive message is needed + // within this tick. + // In probe mode, used to limit the amount of entries sent to + // the follower. + logical_clock::time_point last_append_time = logical_clock::min(); + + // check if a reject packet should be ignored because it was delayed + // or reordered + bool is_stray_reject(const append_reply::rejected&); + + void become_probe(); + void become_pipeline(); + void become_snapshot(); + + // Return true if a new replication record can be sent to the follower. + bool can_send_to(logical_clock::time_point now); + + follower_progress(server_id id_arg, index_t next_idx_arg) + : id(id_arg), next_idx(next_idx_arg) + {} +}; + +using progress = std::unordered_map; + +class tracker: private progress { + // Copy of this server's id + server_id _my_id; +public: + using progress::begin, progress::end, progress::cbegin, progress::cend, progress::size; + + explicit tracker(server_id my_id) + : _my_id(my_id) + {} + + // Return progress for a follower + follower_progress& find(server_id dst) { + return this->progress::find(dst)->second; + } + void set_configuration(const std::vector& servers, index_t next_idx); + + // Calculate the current commit index based on the current + // simple or joint quorum. + index_t committed(index_t prev_commit_idx); +}; + +// Possible leader election outcomes. +enum class vote_result { + // We haven't got enough responses yet, either because + // the servers haven't voted or responses failed to arrive. + UNKNOWN, + // This candidate has won the election + WON, + // The quorum of servers has voted against this candidate + LOST, +}; + +// Candidate's state specific to election +class votes { + size_t _cluster_size = 1; + // Number of responses to RequestVote RPC. + // The candidate always votes for self. + size_t _responded = 1; + // Number of granted votes. + // The candidate always votes for self. + size_t _granted = 1; +public: + void set_configuration(const std::vector& servers) { + _cluster_size = servers.size(); + } + + void register_vote(server_id from, bool granted) { + _responded++; + if (granted) { + _granted++; + } + } + + vote_result tally_votes() const { + auto quorum = _cluster_size / 2 + 1; + if (_granted >= quorum) { + return vote_result::WON; + } + assert(_responded <= _cluster_size); + auto unknown = _cluster_size - _responded; + return _granted + unknown >= quorum ? vote_result::UNKNOWN : vote_result::LOST; + } + + friend std::ostream& operator<<(std::ostream& os, const votes& v); +}; + +} // namespace raft + diff --git a/raft/raft.cc b/raft/raft.cc new file mode 100644 index 0000000000..150d7f217d --- /dev/null +++ b/raft/raft.cc @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "raft.hh" + +namespace raft { + +seastar::logger logger("raft"); + +} // end of namespace raft diff --git a/raft/server.cc b/raft/server.cc new file mode 100644 index 0000000000..34858a65b4 --- /dev/null +++ b/raft/server.cc @@ -0,0 +1,381 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ +#include "server.hh" + +#include +#include +#include +#include +#include +#include + +#include "fsm.hh" +#include "log.hh" + +using namespace std::chrono_literals; + +namespace raft { + +class server_impl : public rpc_server, public server { +public: + explicit server_impl(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector); + + server_impl(server_impl&&) = delete; + + ~server_impl() {} + + // rpc_server interface + void append_entries(server_id from, append_request_recv append_request) override; + void append_entries_reply(server_id from, append_reply reply) override; + void request_vote(server_id from, vote_request vote_request) override; + void request_vote_reply(server_id from, vote_reply vote_reply) override; + + // server interface + future<> add_entry(command command, wait_type type); + future<> apply_snapshot(server_id from, install_snapshot snp) override; + future<> add_server(server_id id, bytes node_info, clock_type::duration timeout) override; + future<> remove_server(server_id id, clock_type::duration timeout) override; + future<> start() override; + future<> abort() override; + term_t get_current_term() const override; + future<> read_barrier() override; + void make_me_leader() override; +private: + std::unique_ptr _rpc; + std::unique_ptr _state_machine; + std::unique_ptr _storage; + seastar::shared_ptr _failure_detector; + // Protocol deterministic finite-state machine + std::unique_ptr _fsm; + // id of this server + server_id _id; + seastar::timer _ticker; + + seastar::pipe> _apply_entries = seastar::pipe>(10); + + struct op_status { + term_t term; // term the entry was added with + promise<> done; // notify when done here + }; + + // Entries that have a waiter that needs to be notified when the + // respective entry is known to be committed. + std::map _awaited_commits; + + // Entries that have a waiter that needs to be notified after + // the respective entry is applied. + std::map _awaited_applies; + + // Called to commit entries (on a leader or otherwise). + void notify_waiters(std::map& waiters, const std::vector& entries); + + // This fiber process fsm output, by doing the following steps in that order: + // - persists current term and voter + // - persists unstable log entries on disk. + // - sends out messages + future<> io_fiber(index_t stable_idx); + + // This fiber runs in the background and applies committed entries. + future<> applier_fiber(); + + template future<> add_entry_internal(T command, wait_type type); + template future<> send_message(server_id id, Message m); + + // Apply a dummy entry. Dummy entry is not propagated to the + // state machine, but waiting for it to be "applied" ensures + // all previous entries are applied as well. + // Resolves when the entry is committed. + // The function has to be called on the leader, throws otherwise + // May fail because of an internal error or because the leader + // has changed and the entry was replaced by another one, + // submitted to the new leader. + future<> apply_dummy_entry(); + + future<> _applier_status = make_ready_future<>(); + future<> _io_status = make_ready_future<>(); + + friend std::ostream& operator<<(std::ostream& os, const server_impl& s); +}; + +server_impl::server_impl(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector) : + _rpc(std::move(rpc)), _state_machine(std::move(state_machine)), + _storage(std::move(storage)), _failure_detector(failure_detector), + _id(uuid) { + set_rpc_server(_rpc.get()); +} + +future<> server_impl::start() { + auto [term, vote] = co_await _storage->load_term_and_vote(); + auto snapshot = co_await _storage->load_snapshot(); + auto log_entries = co_await _storage->load_log(); + auto log = raft::log(std::move(snapshot), std::move(log_entries)); + index_t stable_idx = log.stable_idx(); + _fsm = std::make_unique(_id, term, vote, std::move(log), *_failure_detector); + assert(_fsm->get_current_term() != term_t(0)); + + // start fiber to persist entries added to in-memory log + _io_status = io_fiber(stable_idx); + // start fiber to apply committed entries + _applier_status = applier_fiber(); + + _ticker.arm_periodic(100ms); + _ticker.set_callback([this] { + _fsm->tick(); + }); + + co_return; +} + +template +future<> server_impl::add_entry_internal(T command, wait_type type) { + logger.trace("An entry is submitted on a leader"); + + // lock access to the raft log while it is been updated + + // @todo: ensure the reference to the entry is stable between + // yields, before removing _log_lock. + const log_entry& e = _fsm->add_entry(std::move(command)); + + auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies; + + // This will track the commit/apply status of the entry + auto [it, inserted] = container.emplace(e.idx, op_status{e.term, promise<>()}); + assert(inserted); + return it->second.done.get_future(); +} + +future<> server_impl::add_entry(command command, wait_type type) { + return add_entry_internal(std::move(command), type); +} + +future<> server_impl::apply_dummy_entry() { + return add_entry_internal(log_entry::dummy(), wait_type::applied); +} +void server_impl::append_entries(server_id from, append_request_recv append_request) { + _fsm->step(from, std::move(append_request)); +} + +void server_impl::append_entries_reply(server_id from, append_reply reply) { + _fsm->step(from, std::move(reply)); +} + +void server_impl::request_vote(server_id from, vote_request vote_request) { + _fsm->step(from, std::move(vote_request)); +} + +void server_impl::request_vote_reply(server_id from, vote_reply vote_reply) { + _fsm->step(from, std::move(vote_reply)); +} + +void server_impl::notify_waiters(std::map& waiters, + const std::vector& entries) { + index_t commit_idx = entries.back()->idx; + index_t first_idx = entries.front()->idx; + + while (waiters.size() != 0) { + auto it = waiters.begin(); + if (it->first > commit_idx) { + break; + } + auto [entry_idx, status] = std::move(*it); + + // if there is a waiter entry with an index smaller than first entry + // it means that notification is out of order which is prohinited + assert(entry_idx >= first_idx); + + waiters.erase(it); + if (status.term == entries[entry_idx - first_idx]->term) { + status.done.set_value(); + } else { + // term does not match which means that between the entry was submitted + // and committed there was a leadership change and the entry was replaced. + status.done.set_exception(dropped_entry()); + } + } +} + +template +future<> server_impl::send_message(server_id id, Message m) { + return std::visit([this, id] (auto&& m) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return _rpc->send_append_entries_reply(id, m); + } else if constexpr (std::is_same_v) { + return _rpc->send_append_entries(id, m); + } else if constexpr (std::is_same_v) { + return _rpc->send_vote_request(id, m); + } else if constexpr (std::is_same_v) { + return _rpc->send_vote_reply(id, m); + } else { + static_assert(!sizeof(Message*), "not all message types are handled"); + return make_ready_future<>(); + } + }, std::move(m)); +} + +future<> server_impl::io_fiber(index_t last_stable) { + logger.trace("[{}] io_fiber start", _id); + try { + while (true) { + auto batch = co_await _fsm->poll_output(); + + if (batch.term != term_t{}) { + // Current term and vote are always persisted + // together. A vote may change independently of + // term, but it's safe to update both in this + // case. + co_await _storage->store_term_and_vote(batch.term, batch.vote); + } + + if (batch.log_entries.size()) { + auto& entries = batch.log_entries; + + if (last_stable >= entries[0]->idx) { + co_await _storage->truncate_log(entries[0]->idx); + } + + // Combine saving and truncating into one call? + // will require storage to keep track of last idx + co_await _storage->store_log_entries(entries); + + last_stable = (*entries.crbegin())->idx; + } + + if (batch.messages.size()) { + // after entries are persisted we can send messages + co_await seastar::parallel_for_each(std::move(batch.messages), [this] (std::pair& message) { + return send_message(message.first, std::move(message.second)); + }); + } + + // process committed entries + if (batch.committed.size()) { + notify_waiters(_awaited_commits, batch.committed); + co_await _apply_entries.writer.write(std::move(batch.committed)); + } + } + } catch (seastar::broken_condition_variable&) { + // log fiber is stopped explicitly. + } catch (...) { + logger.error("[{}] io fiber stopped because of the error: {}", _id, std::current_exception()); + } + co_return; +} + +future<> server_impl::apply_snapshot(server_id from, install_snapshot snp) { + return make_ready_future<>(); +} + +future<> server_impl::applier_fiber() { + logger.trace("applier_fiber start"); + try { + while (true) { + auto opt_batch = co_await _apply_entries.reader.read(); + if (!opt_batch) { + // EOF + break; + } + std::vector commands; + commands.reserve(opt_batch->size()); + + std::ranges::copy( + *opt_batch | + std::views::filter([] (log_entry_ptr& entry) { return std::holds_alternative(entry->data); }) | + std::views::transform([] (log_entry_ptr& entry) { return std::cref(std::get(entry->data)); }), + std::back_inserter(commands)); + + co_await _state_machine->apply(std::move(commands)); + notify_waiters(_awaited_applies, *opt_batch); + } + } catch (...) { + logger.error("applier fiber {} stopped because of the error: {}", _id, std::current_exception()); + } + co_return; +} + +term_t server_impl::get_current_term() const { + return _fsm->get_current_term(); +} + +future<> server_impl::read_barrier() { + if (_fsm->can_read()) { + co_return; + } + + co_await apply_dummy_entry(); + co_return; +} + +future<> server_impl::abort() { + logger.trace("abort() called"); + _fsm->stop(); + { + // there is not explicit close for the pipe! + auto tmp = std::move(_apply_entries.writer); + } + for (auto& ac: _awaited_commits) { + ac.second.done.set_exception(stopped_error()); + } + for (auto& aa: _awaited_applies) { + aa.second.done.set_exception(stopped_error()); + } + _awaited_commits.clear(); + _awaited_applies.clear(); + _ticker.cancel(); + + return seastar::when_all_succeed(std::move(_io_status), std::move(_applier_status), + _rpc->abort(), _state_machine->abort(), _storage->abort()).discard_result(); +} + +future<> server_impl::add_server(server_id id, bytes node_info, clock_type::duration timeout) { + return make_ready_future<>(); +} + +// Removes a server from the cluster. If the server is not a member +// of the cluster does nothing. Can be called on a leader only +// otherwise throws not_a_leader. +// Cannot be called until previous add/remove server completes +// otherwise conf_change_in_progress exception is returned. +future<> server_impl::remove_server(server_id id, clock_type::duration timeout) { + return make_ready_future<>(); +} + +void server_impl::make_me_leader() { + _fsm->become_leader(); +} + +std::unique_ptr create_server(server_id uuid, std::unique_ptr rpc, + std::unique_ptr state_machine, std::unique_ptr storage, + seastar::shared_ptr failure_detector) { + return std::make_unique(uuid, std::move(rpc), std::move(state_machine), + std::move(storage), failure_detector); +} + +std::ostream& operator<<(std::ostream& os, const server_impl& s) { + os << "[id: " << s._id << ", fsm (" << s._fsm << ")]\n"; + return os; +} + +} // end of namespace raft From 4959609589a5929825a2709be5e2d6b641b3da52 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 13 Aug 2020 17:43:38 +0300 Subject: [PATCH 3/5] raft: add raft tests Add test for currently implemented raft features. replication_test tests replication functionality with various initial log configurations. raft_fsm_test test voting state machine functionality. --- test/boost/raft_fsm_test.cc | 265 ++++++++++++++++++++++++ test/raft/replication_test.cc | 378 ++++++++++++++++++++++++++++++++++ test/raft/suite.yaml | 5 + 3 files changed, 648 insertions(+) create mode 100644 test/boost/raft_fsm_test.cc create mode 100644 test/raft/replication_test.cc create mode 100644 test/raft/suite.yaml diff --git a/test/boost/raft_fsm_test.cc b/test/boost/raft_fsm_test.cc new file mode 100644 index 0000000000..799ec7eae6 --- /dev/null +++ b/test/boost/raft_fsm_test.cc @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2020, Arm Limited and affiliates. All rights reserved. + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#define BOOST_TEST_MODULE raft + +#include +#include "test/lib/log.hh" + +#include "raft/fsm.hh" + +using raft::term_t, raft::index_t, raft::server_id; + +void election_threshold(raft::fsm& fsm) { + for (int i = 0; i <= raft::ELECTION_TIMEOUT.count(); i++) { + fsm.tick(); + } +} + +void election_timeout(raft::fsm& fsm) { + for (int i = 0; i <= 2 * raft::ELECTION_TIMEOUT.count(); i++) { + fsm.tick(); + } +} + +struct failure_detector: public raft::failure_detector { + bool alive = true; + bool is_alive(raft::server_id from) override { + return alive; + } +}; + +BOOST_AUTO_TEST_CASE(test_election_single_node) { + + failure_detector fd; + server_id id1{utils::make_random_uuid()}; + raft::configuration cfg({id1}); + raft::log log{raft::snapshot{.config = cfg}}; + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + BOOST_CHECK(fsm.is_follower()); + + election_timeout(fsm); + + // Immediately converts from leader to follower if quorum=1 + BOOST_CHECK(fsm.is_leader()); + + auto output = fsm.get_output(); + + BOOST_CHECK(output.term); + BOOST_CHECK(output.vote); + BOOST_CHECK(output.messages.empty()); + BOOST_CHECK(output.log_entries.empty()); + BOOST_CHECK(output.committed.empty()); + // The leader does not become candidate simply because + // a timeout has elapsed, i.e. there are no spurious + // elections. + election_timeout(fsm); + BOOST_CHECK(fsm.is_leader()); + output = fsm.get_output(); + BOOST_CHECK(!output.term); + BOOST_CHECK(!output.vote); + BOOST_CHECK(output.messages.empty()); + BOOST_CHECK(output.log_entries.empty()); + BOOST_CHECK(output.committed.empty()); +} + +// Test that adding an entry to a single-node cluster +// does not lead to RPC +BOOST_AUTO_TEST_CASE(test_single_node_is_quiet) { + + failure_detector fd; + server_id id1{utils::make_random_uuid()}; + raft::configuration cfg({id1}); + raft::log log{raft::snapshot{.config = cfg}}; + + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + election_timeout(fsm); + + // Immediately converts from leader to follower if quorum=1 + BOOST_CHECK(fsm.is_leader()); + + (void) fsm.get_output(); + + fsm.add_entry(raft::command{}); + + BOOST_CHECK(fsm.get_output().messages.empty()); +} + +BOOST_AUTO_TEST_CASE(test_election_two_nodes) { + + failure_detector fd; + + server_id id1{utils::make_random_uuid()}, id2{utils::make_random_uuid()}; + + raft::configuration cfg({id1, id2}); + raft::log log{raft::snapshot{.config = cfg}}; + + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + // Initial state is follower + BOOST_CHECK(fsm.is_follower()); + + // After election timeout, a follower becomes a candidate + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + + // If nothing happens, the candidate stays this way + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + + auto output = fsm.get_output(); + // After a favourable reply, we become a leader (quorum is 2) + fsm.step(id2, raft::vote_reply{output.term, true}); + BOOST_CHECK(fsm.is_leader()); + // Out of order response to the previous election is ignored + fsm.step(id2, raft::vote_reply{output.term - term_t{1}, false}); + assert(fsm.is_leader()); + + // Vote request within the election timeout is ignored + // (avoiding disruptive leaders). + fsm.step(id2, raft::vote_request{output.term + term_t{1}}); + BOOST_CHECK(fsm.is_leader()); + // Any message with a newer term after election timeout + // -> immediately convert to follower + fd.alive = false; + election_threshold(fsm); + fsm.step(id2, raft::vote_request{output.term + term_t{1}}); + BOOST_CHECK(fsm.is_follower()); + + // Check that the candidate converts to a follower as well + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + output = fsm.get_output(); + fsm.step(id2, raft::vote_request{output.term + term_t{1}}); + BOOST_CHECK(fsm.is_follower()); + + // Test that a node doesn't cast a vote if it has voted for + // self already + (void) fsm.get_output(); + while (fsm.is_follower()) { + fsm.tick(); + } + BOOST_CHECK(fsm.is_candidate()); + output = fsm.get_output(); + auto msg = std::get(output.messages.back().second); + fsm.step(id2, std::move(msg)); + // We could figure out this round is going to a nowhere, but + // we're not that smart and simply wait for a vote_reply. + BOOST_CHECK(fsm.is_candidate()); + output = fsm.get_output(); + auto reply = std::get(output.messages.back().second); + BOOST_CHECK(!reply.vote_granted); +} + +BOOST_AUTO_TEST_CASE(test_election_four_nodes) { + + failure_detector fd; + + server_id id1{utils::make_random_uuid()}, + id2{utils::make_random_uuid()}, + id3{utils::make_random_uuid()}, + id4{utils::make_random_uuid()}; + + raft::configuration cfg({id1, id2, id3, id4}); + raft::log log{raft::snapshot{.config = cfg}}; + + raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd); + + // Initial state is follower + BOOST_CHECK(fsm.is_follower()); + + // Inform FSM about a new leader at a new term + fsm.step(id4, raft::append_request_recv{term_t{1}, id4, index_t{1}, term_t{1}}); + + (void) fsm.get_output(); + + // Request a vote during the same term. Even though + // we haven't voted, we should deny a vote because we + // know about a leader for this term. + fsm.step(id3, raft::vote_request{term_t{1}, index_t{1}, term_t{1}}); + + auto output = fsm.get_output(); + auto reply = std::get(output.messages.back().second); + BOOST_CHECK(!reply.vote_granted); + + // Run out of steam for this term. Start a new one. + fd.alive = false; + election_timeout(fsm); + BOOST_CHECK(fsm.is_candidate()); + + output = fsm.get_output(); + // Add a favourable reply, not enough for quorum + fsm.step(id2, raft::vote_reply{output.term, true}); + BOOST_CHECK(fsm.is_candidate()); + + // Add another one, this adds up to quorum + fsm.step(id3, raft::vote_reply{output.term, true}); + BOOST_CHECK(fsm.is_leader()); +} + +BOOST_AUTO_TEST_CASE(test_log_matching_rule) { + + failure_detector fd; + + server_id id1{utils::make_random_uuid()}, + id2{utils::make_random_uuid()}, + id3{utils::make_random_uuid()}; + + raft::configuration cfg({id1, id2, id3}); + raft::log log(raft::snapshot{.idx = index_t{999}, .config = cfg}); + + log.emplace_back(raft::log_entry{term_t{10}, index_t{1000}}); + log.stable_to(log.last_idx()); + + raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), fd); + + // Initial state is follower + BOOST_CHECK(fsm.is_follower()); + + (void) fsm.get_output(); + + fsm.step(id2, raft::vote_request{term_t{9}, index_t{1001}, term_t{11}}); + // Current term is too old - vote is not granted + auto output = fsm.get_output(); + BOOST_CHECK(output.messages.empty()); + + auto request_vote = [&](term_t term, index_t last_log_idx, term_t last_log_term) -> raft::vote_reply { + fsm.step(id2, raft::vote_request{term, last_log_idx, last_log_term}); + auto output = fsm.get_output(); + return std::get(output.messages.back().second); + }; + + // Last stable index is too small - vote is not granted + BOOST_CHECK(!request_vote(term_t{11}, index_t{999}, term_t{10}).vote_granted); + // Last stable term is too small - vote is not granted + BOOST_CHECK(!request_vote(term_t{12}, index_t{1002}, term_t{9}).vote_granted); + // Last stable term and index are equal to the voter's - vote + // is granted + BOOST_CHECK(request_vote(term_t{13}, index_t{1000}, term_t{10}).vote_granted); + // Last stable term is the same, index is greater to the voter's - vote + // is granted + BOOST_CHECK(request_vote(term_t{14}, index_t{1001}, term_t{10}).vote_granted); + // Both term and index are greater than the voter's - vote + // is granted + BOOST_CHECK(request_vote(term_t{15}, index_t{1001}, term_t{11}).vote_granted); +} diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc new file mode 100644 index 0000000000..1b31539d8d --- /dev/null +++ b/test/raft/replication_test.cc @@ -0,0 +1,378 @@ +#include +#include +#include +#include +#include +#include +#include +#include "raft/server.hh" +#include "serializer.hh" +#include "serializer_impl.hh" + +using namespace std::chrono_literals; + +static seastar::logger tlogger("test"); + +std::mt19937 random_generator() { + std::random_device rd; + // In case of errors, replace the seed with a fixed value to get a deterministic run. + auto seed = rd(); + std::cout << "Random seed: " << seed << "\n"; + return std::mt19937(seed); +} + +int rand() { + static thread_local std::uniform_int_distribution dist(0, std::numeric_limits::max()); + static thread_local auto gen = random_generator(); + + return dist(gen); +} + +bool drop_replication = false; + +class state_machine : public raft::state_machine { +public: + using apply_fn = std::function(raft::server_id id, promise<>&, const std::vector& commands)>; +private: + raft::server_id _id; + apply_fn _apply; + promise<> _done; +public: + state_machine(raft::server_id id, apply_fn apply) : _id(id), _apply(std::move(apply)) {} + virtual future<> apply(const std::vector commands) { + return _apply(_id, _done, commands); + } + virtual future take_snaphot() { return make_ready_future(raft::snapshot_id()); } + virtual void drop_snapshot(raft::snapshot_id id) {} + virtual future<> load_snapshot(raft::snapshot_id id) { return make_ready_future<>(); }; + virtual future<> abort() { return make_ready_future<>(); } + + future<> done() { + return _done.get_future(); + } +}; + +struct initial_state { + raft::term_t term = raft::term_t(1); + raft::server_id vote; + std::vector log; + raft::snapshot snapshot; +}; + + +class storage : public raft::storage { + initial_state _conf; +public: + storage(initial_state conf) : _conf(std::move(conf)) {} + storage() {} + virtual future<> store_term_and_vote(raft::term_t term, raft::server_id vote) { co_return seastar::sleep(1us); } + virtual future> load_term_and_vote() { + auto term_and_vote = std::make_pair(_conf.term, _conf.vote); + return make_ready_future>(term_and_vote); + } + virtual future<> store_snapshot(const raft::snapshot& snap, size_t preserve_log_entries) { return make_ready_future<>(); } + virtual future load_snapshot() { + return make_ready_future(_conf.snapshot); + } + virtual future<> store_log_entries(const std::vector& entries) { co_return seastar::sleep(1us); }; + virtual future load_log() { + raft::log_entries log; + for (auto&& e : _conf.log) { + log.emplace_back(make_lw_shared(std::move(e))); + } + return make_ready_future(std::move(log)); + } + virtual future<> truncate_log(raft::index_t idx) { return make_ready_future<>(); } + virtual future<> abort() { return make_ready_future<>(); } +}; + +class rpc : public raft::rpc { + static std::unordered_map net; + raft::server_id _id; +public: + rpc(raft::server_id id) : _id(id) { + net[_id] = this; + } + virtual future<> send_snapshot(raft::server_id id, const raft::install_snapshot& snap) { return make_ready_future<>(); } + virtual future<> send_append_entries(raft::server_id id, const raft::append_request_send& append_request) { + if (drop_replication && !(rand() % 5)) { + return make_ready_future<>(); + } + raft::append_request_recv req; + req.current_term = append_request.current_term; + req.leader_id = append_request.leader_id; + req.prev_log_idx = append_request.prev_log_idx; + req.prev_log_term = append_request.prev_log_term; + req.leader_commit_idx = append_request.leader_commit_idx; + for (auto&& e: append_request.entries) { + req.entries.push_back(e); + } + net[id]->_client->append_entries(_id, std::move(req)); + //co_return seastar::sleep(1us); + return make_ready_future<>(); + } + virtual future<> send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) { + if (drop_replication && !(rand() % 5)) { + return make_ready_future<>(); + } + net[id]->_client->append_entries_reply(_id, std::move(reply)); + return make_ready_future<>(); + } + virtual future<> send_vote_request(raft::server_id id, const raft::vote_request& vote_request) { + net[id]->_client->request_vote(_id, std::move(vote_request)); + return make_ready_future<>(); + } + virtual future<> send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) { + net[id]->_client->request_vote_reply(_id, std::move(vote_reply)); + return make_ready_future<>(); + } + virtual void add_server(raft::server_id id, bytes node_info) {} + virtual void remove_server(raft::server_id id) {} + virtual future<> abort() { return make_ready_future<>(); } +}; + +class failure_detector : public raft::failure_detector { + bool is_alive(raft::server_id server) override { + return true; + } +}; + +std::unordered_map rpc::net; + +std::pair, state_machine*> +create_raft_server(raft::server_id uuid, state_machine::apply_fn apply, + initial_state state) { + + auto sm = std::make_unique(uuid, std::move(apply)); + auto& rsm = *sm; + auto mrpc = std::make_unique(uuid); + auto mstorage = std::make_unique(state); + auto fd = seastar::make_shared(); + auto raft = raft::create_server(uuid, std::move(mrpc), std::move(sm), std::move(mstorage), + std::move(fd)); + + return std::make_pair(std::move(raft), &rsm); +} + +future, state_machine*>>> create_cluster(std::vector states, state_machine::apply_fn apply) { + raft::configuration config; + std::vector, state_machine*>> rafts; + + for (size_t i = 0; i < states.size(); i++) { + auto uuid = utils::make_random_uuid(); + config.servers.push_back(raft::server_address{uuid}); + } + + for (size_t i = 0; i < states.size(); i++) { + auto& s = config.servers[i]; + states[i].snapshot.config = config; + auto& raft = *rafts.emplace_back(create_raft_server(s.id, apply, states[i])).first; + co_await raft.start(); + } + + co_return std::move(rafts); +} + +struct log_entry { + unsigned term; + int value; +}; + +std::vector create_log(std::initializer_list list, unsigned start_idx = 1) { + std::vector log; + + unsigned i = start_idx; + for (auto e : list) { + raft::command command; + ser::serialize(command, e.value); + log.push_back(raft::log_entry{raft::term_t(e.term), raft::index_t(i++), std::move(command)}); + } + + return log; +} + +constexpr int itr = 100; +std::unordered_map sums; + +future<> apply(raft::server_id id, promise<>& done, const std::vector& commands) { + tlogger.debug("sm::apply got {} entries", commands.size()); + for (auto&& d : commands) { + auto is = ser::as_input_stream(d); + int n = ser::deserialize(is, boost::type()); + tlogger.debug("{}: apply {}", id, n); + auto it = sums.find(id); + if (it == sums.end()) { + sums[id] = 0; + } + sums[id] += n; + } + if (sums[id] == ((itr - 1) * itr)/2) { + done.set_value(); + } + return make_ready_future<>(); +}; + + +future<> test_helper(std::vector states, int start_itr = 0) { + auto rafts = co_await create_cluster(states, apply); + + auto& leader = *rafts[0].first; + leader.make_me_leader(); + + co_await seastar::parallel_for_each(std::views::iota(start_itr, itr), [&] (int i) { + tlogger.debug("Adding entry {} on a leader", i); + raft::command command; + ser::serialize(command, i); + return leader.add_entry(std::move(command), raft::wait_type::committed); + }); + + for (auto& r: rafts) { + co_await r.second->done(); + } + + for (auto& r: rafts) { + co_await r.first->abort(); + } + + sums.clear(); + co_return; +} + +future<> test_simple_replication(size_t size) { + return test_helper(std::vector(size)); +} + +// initially a leader has non empty log +future<> test_replicate_non_empty_leader_log() { + // 2 nodes, leader has entries in his log + std::vector states(2); + states[0].term = raft::term_t(1); + states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}}); + + // start iterations from 4 since o4 entry is already in the log + return test_helper(std::move(states), 4); +} + +// test special case where prev_index = 0 because the leader's log is empty +future<> test_replace_log_leaders_log_empty() { + // current leaders term is 2 and empty log + // one of the follower have three entries that should be replaced + std::vector states(3); + states[0].term = raft::term_t(2); + states[2].log = create_log({{1, 10}, {1, 20}, {1, 30}}); + + return test_helper(std::move(states)); +} + +// two nodes, leader has one entry, follower has 3, existing entries do not match +future<> test_replace_log_leaders_log_not_empty() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(3); + states[0].log = create_log({{1, 0}}); + states[1].log = create_log({{2, 10}, {2, 20}, {2, 30}}); + + // start iterations from 1 since one entry is already in the log + return test_helper(std::move(states), 1); +} + +// two nodes, leader has 2 entries, follower has 4, index=1 matches index=2 does not +future<> test_replace_log_leaders_log_not_empty_2() { + // current leader's term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(3); + states[0].log = create_log({{1, 0}, {1, 1}}); + states[1].log = create_log({{1, 0}, {2, 20}, {2, 30}, {2, 40}}); + + // start iterations from 2 since 2 entries are already in the log + return test_helper(std::move(states), 2); +} + +// a follower and a leader have matching logs but leader's is shorter +future<> test_replace_log_leaders_log_not_empty_3() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(2); + states[0].log = create_log({{1, 0}, {1, 1}}); + states[1].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}}); + + // start iterations from 2 since 2 entries are already in the log + return test_helper(std::move(states), 2); +} + +// a follower and a leader have no common entries +future<> test_replace_no_common_entries() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(3); + states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}, {1, 6}}); + states[1].log = create_log({{2, 10}, {2, 11}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}}); + + // start iterations from 7 since 7 entries are already in the log + return test_helper(std::move(states), 7); +} + +// a follower and a leader have one common entry +future<> test_replace_one_common_entry() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(4); + states[0].log = create_log({{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}, {3, 6}}); + states[1].log = create_log({{1, 0}, {2, 11}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}}); + + // start iterations from 7 since 7 entries are already in the log + return test_helper(std::move(states), 7); +} + +// a follower and a leader have t1i common entry in different terms +future<> test_replace_two_common_entry_different_terms() { + // current leaders term is 2 and the log has one entry + // one of the follower have three entries that should be replaced + std::vector states(2); + states[0].term = raft::term_t(5); + states[0].log = create_log({{1, 0}, {2, 1}, {3, 2}, {3, 3}, {3, 4}, {3, 5}, {4, 6}}); + states[1].log = create_log({{1, 0}, {2, 1}, {2, 12}, {2, 13}, {2, 14}, {2, 15}, {2, 16}}); + + // start iterations from 7 since 7 entries are already in the log + return test_helper(std::move(states), 7); +} + +int main(int argc, char* argv[]) { + namespace bpo = boost::program_options; + + seastar::app_template::config cfg; + seastar::app_template app(cfg); + app.add_options() + ("drop-replication", bpo::value()->default_value(false), "drop replication packets randomly"); + + using test_fn = std::function()>; + + test_fn tests[] = { + std::bind(test_simple_replication, 1), + std::bind(test_simple_replication, 2), + test_replicate_non_empty_leader_log, + test_replace_log_leaders_log_empty, + test_replace_log_leaders_log_not_empty, + test_replace_log_leaders_log_not_empty_2, + test_replace_log_leaders_log_not_empty_3, + test_replace_no_common_entries, + test_replace_one_common_entry, + test_replace_two_common_entry_different_terms, + }; + + return app.run(argc, argv, [&tests, &app] () -> future<> { + drop_replication = app.configuration()["drop-replication"].as(); + + int i = 0; + for (auto& t : tests) { + tlogger.debug("test: {}", i++); + co_await t(); + } + }); +} + diff --git a/test/raft/suite.yaml b/test/raft/suite.yaml new file mode 100644 index 0000000000..2e3ea24d79 --- /dev/null +++ b/test/raft/suite.yaml @@ -0,0 +1,5 @@ +# Suite test type. Supported types: unit, boost, cql +type: unit +custom_args: + replication_test: + - '-c1 -m200M' From 16cb009ea2984fafbc0965beb9fab39fffb78a39 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 13 Aug 2020 18:22:16 +0300 Subject: [PATCH 4/5] raft: compile raft tests Compilation is not enabled by default as it requires coroutines support and may require special compiler (until distributed one fixes all the bugs related to coroutines). To enable raft tests compilation new configure.py option is added (--build-raft). --- configure.py | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/configure.py b/configure.py index 44e3b5b23d..10839a1d05 100755 --- a/configure.py +++ b/configure.py @@ -433,13 +433,18 @@ perf_tests = set([ 'test/perf/perf_big_decimal', ]) +raft_tests = set([ + 'test/raft/replication_test', + 'test/boost/raft_fsm_test', +]) + apps = set([ 'scylla', 'test/tools/cql_repl', 'tools/scylla-types', ]) -tests = scylla_tests | perf_tests +tests = scylla_tests | perf_tests | raft_tests other = set([ 'iotune', @@ -498,6 +503,8 @@ arg_parser.add_argument('--with-antlr3', dest='antlr3_exec', action='store', def help='path to antlr3 executable') arg_parser.add_argument('--with-ragel', dest='ragel_exec', action='store', default='ragel', help='path to ragel executable') +arg_parser.add_argument('--build-raft', dest='build_raft', action='store_true', default=False, + help='build raft code') add_tristate(arg_parser, name='stack-guards', dest='stack_guards', help='Use stack guards') arg_parser.add_argument('--verbose', dest='verbose', action='store_true', help='Make configure.py output more verbose (useful for debugging the build process itself)') @@ -506,6 +513,21 @@ arg_parser.add_argument('--test-repeat', dest='test_repeat', action='store', typ arg_parser.add_argument('--test-timeout', dest='test_timeout', action='store', type=str, default='7200') args = arg_parser.parse_args() +coroutines_test_src = ''' +#define GCC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) +#if GCC_VERSION < 100201 + #error "Coroutines support requires at leat gcc 10.2.1" +#endif +''' +compiler_supports_coroutines = try_compile(compiler=args.cxx, source=coroutines_test_src) + +if args.build_raft and not compiler_supports_coroutines: + raise Exception("--build-raft is requested, while the used compiler does not support coroutines") + +if not args.build_raft: + all_artifacts.difference_update(raft_tests) + tests.difference_update(raft_tests) + defines = ['XXH_PRIVATE_API', 'SEASTAR_TESTING_MAIN', ] @@ -943,6 +965,15 @@ scylla_tests_dependencies = scylla_core + idls + scylla_tests_generic_dependenci 'test/lib/random_schema.cc', ] +scylla_raft_dependencies = [ + 'raft/raft.cc', + 'raft/server.cc', + 'raft/fsm.cc', + 'raft/progress.cc', + 'raft/log.cc', + 'utils/uuid.cc' +] + deps = { 'scylla': idls + ['main.cc', 'release.cc', 'utils/build_id.cc'] + scylla_core + api + alternator + redis, 'test/tools/cql_repl': idls + ['test/tools/cql_repl.cc'] + scylla_core + scylla_tests_generic_dependencies, @@ -1063,8 +1094,12 @@ deps['test/boost/linearizing_input_stream_test'] = [ deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc'] deps['test/boost/alternator_base64_test'] += ['alternator/base64.cc'] +deps['test/raft/replication_test'] = ['test/raft/replication_test.cc'] + scylla_raft_dependencies +deps['test/boost/raft_fsm_test'] = ['test/boost/raft_fsm_test.cc', 'test/lib/log.cc'] + scylla_raft_dependencies + deps['utils/gz/gen_crc_combine_table'] = ['utils/gz/gen_crc_combine_table.cc'] + warnings = [ '-Wall', '-Werror', @@ -1240,6 +1275,10 @@ args.user_cflags += ' -Wno-error=stack-usage=' args.user_cflags += f"-ffile-prefix-map={curdir}=." seastar_cflags = args.user_cflags + +if build_raft: + seastar_cflags += ' -fcoroutines' + if args.target != '': seastar_cflags += ' -march=' + args.target seastar_ldflags = args.user_ldflags @@ -1368,6 +1407,9 @@ libs = ' '.join([maybe_static(args.staticyamlcpp, '-lyaml-cpp'), '-latomic', '-l if not args.staticboost: args.user_cflags += ' -DBOOST_TEST_DYN_LINK' +if build_raft: + args.user_cflags += ' -DENABLE_SCYLLA_RAFT -fcoroutines' + # thrift version detection, see #4538 proc_res = subprocess.run(["thrift", "-version"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) proc_res_output = proc_res.stdout.decode("utf-8") From 9a5f2b87dcba564165668e337c86ec1eab937e10 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov Date: Fri, 21 Aug 2020 00:07:42 +0300 Subject: [PATCH 5/5] raft: add a short readme file The file has a brief description of the code status, usage and some implementation assumptions. --- raft/README.md | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 raft/README.md diff --git a/raft/README.md b/raft/README.md new file mode 100644 index 0000000000..b3184e8be7 --- /dev/null +++ b/raft/README.md @@ -0,0 +1,71 @@ +# Raft consensus algorithm implementation for Seastar + +Seastar is a high performance server-side application framework +written in C++. Please read more about Seastar at http://seastar.io/ + +This library provides an efficient, extensible, implementation of +Raft consensus algorithm for Seastar. +For more details about Raft see https://raft.github.io/ + +## Implementation status +--------------------- +- log replication, including throttling for unresponsive + servers +- leader election + +## Usage +----- + +In order to use the library the application has to provide implementations +for RPC, storage and state machine APIs, defined in raft/raft.hh. The +purpose of these interfaces is: +- provide a way to communicate between Raft protocol instances +- persist the required protocol state on disk, +a pre-requisite of the protocol correctness, +- apply committed entries to the state machine. + +While comments for these classes provide an insight into +expected guarantees they should provide, in order to provide a complying +implementation it's necessary to understand the expectations +of the Raft consistency protocol on its environment: +- RPC should implement a model of asynchronous, unreliable network, + in which messages can be lost, reordered, retransmitted more than + once, but not corrupted. Specifically, it's an error to + deliver a message to a Raft server which was not sent to it. +- storage should provide a durable persistent storage, which + survives between state machine restarts and does not corrupt + its state. +- Raft library calls `state_machine::apply_entry()` for entries + reliably committed to the replication log on the majority of + servers. While `apply_entry()` is called in the order + entries are serialized in the distributed log, there is + no guarantee that `apply_entry()` is called exactly once. + E.g. when a protocol instance restart from persistent state, + it may re-apply some already applied log entries. + +Seastar's execution model is that every object is safe to use +within a given shard (physical OS thread). Raft library follows +the same pattern. Calls to Raft API are safe when they are local +to a single shard. Moving instances of the library between shards +is not supported. + +### First usage. + +For an example of first usage see `replication_test.cc` in test/raft/. + +In a nutshell: +- create instances of RPC, storage, and state machine +- pass them to an instance of Raft server - the facade to the Raft cluster + on this node +- repeat the above for every node in the cluster +- use `server::add_entry()` to submit new entries + on a leader, `state_machine::apply_entries()` is called after the added + entry is committed by the cluster. + +### Subsequent usages + +Similar to the first usage, but `storage::load_term_and_vote()` +`storage::load_log()`, `storage::load_snapshot()` are expected to +return valid protocol state as persisted by the previous incarnation +of an instance of class server. +