When the local entry with `read_idx` belongs to the current term, it's
safe to update the local `commit_idx` to `read_idx`.
The motivation for this change is to speed up read barriers. `wait_for_apply`
executed at the end of `read_barrier` is delayed until the follower learns
that the entry with `read_idx` is committed. It usually happens quickly in
the `read_quorum` message. However, non-voters don't receive this message,
so they have to wait for `append_entries`. If no new entries are being
added, `append_entries` can come only from `fsm::tick_leader()`. For group0,
this happens once every 100ms.
The issue above significantly slows down cluster setups in tests. Nodes
join group0 as non-voters, and then they are met with several read barriers
just after a write to group0. One example is `global_token_metadata_barrier`
in `write_both_read_new` performed just after `update_topology_state` in
`write_both_read_old`.
I tested the performance impact of this change with the following test:
```python
for _ in range(10):
await manager.servers_add(3)
```
It consistently takes 44-45s with the change and 50-51s without the change
in dev mode.
No backport:
- non-critical performance improvement mostly relevant in tests,
- the change requires some soak time in master.
Closes scylladb/scylladb#28891
* github.com:scylladb/scylladb:
raft: server: fix the repeating typo
raft: clarify the comment about read_barrier_reply
raft: read_barrier: update local commit_idx to read_idx when it's safe
raft: log: clarify the specification of term_for
1962 lines
86 KiB
C++
1962 lines
86 KiB
C++
/*
|
|
* Copyright (C) 2020-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
#include "server.hh"
|
|
|
|
#include "utils/assert.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include <boost/range/adaptor/transformed.hpp>
|
|
#include <boost/range/adaptor/map.hpp>
|
|
#include <boost/range/algorithm/copy.hpp>
|
|
#include <boost/range/join.hpp>
|
|
#include <boost/lexical_cast.hpp>
|
|
#include <map>
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/shared_future.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/pipe.hh>
|
|
#include <seastar/core/metrics.hh>
|
|
#include <seastar/rpc/rpc_types.hh>
|
|
#include <absl/container/flat_hash_map.h>
|
|
#include <seastar/core/gate.hh>
|
|
|
|
#include "fsm.hh"
|
|
#include "log.hh"
|
|
#include "raft.hh"
|
|
|
|
#include "utils/exceptions.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
namespace raft {
|
|
|
|
struct active_read {
|
|
read_id id;
|
|
index_t idx;
|
|
seastar::promise<read_barrier_reply> promise;
|
|
optimized_optional<abort_source::subscription> abort;
|
|
};
|
|
|
|
struct awaited_index {
|
|
seastar::promise<> promise;
|
|
optimized_optional<abort_source::subscription> abort;
|
|
};
|
|
|
|
struct awaited_conf_change {
|
|
seastar::promise<> promise;
|
|
optimized_optional<abort_source::subscription> abort;
|
|
};
|
|
|
|
static const seastar::metrics::label server_id_label("id");
|
|
static const seastar::metrics::label log_entry_type("log_entry_type");
|
|
static const seastar::metrics::label message_type("message_type");
|
|
|
|
class server_impl : public rpc_server, public server {
|
|
public:
|
|
explicit server_impl(server_id uuid, std::unique_ptr<rpc> rpc,
|
|
std::unique_ptr<state_machine> state_machine, std::unique_ptr<persistence> persistence,
|
|
seastar::shared_ptr<failure_detector> failure_detector, server::configuration config);
|
|
|
|
server_impl(server_impl&&) = delete;
|
|
|
|
~server_impl() {}
|
|
|
|
// rpc_server interface
|
|
void append_entries(server_id from, append_request append_request) override;
|
|
void append_entries_reply(server_id from, append_reply reply) override;
|
|
void request_vote(server_id from, vote_request vote_request) override;
|
|
void request_vote_reply(server_id from, vote_reply vote_reply) override;
|
|
void timeout_now_request(server_id from, timeout_now timeout_now) override;
|
|
void read_quorum_request(server_id from, struct read_quorum read_quorum) override;
|
|
void read_quorum_reply(server_id from, struct read_quorum_reply read_quorum_reply) override;
|
|
future<read_barrier_reply> execute_read_barrier(server_id, seastar::abort_source* as) override;
|
|
future<add_entry_reply> execute_add_entry(server_id from, command cmd, seastar::abort_source* as) override;
|
|
future<add_entry_reply> execute_modify_config(server_id from,
|
|
std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) override;
|
|
future<snapshot_reply> apply_snapshot(server_id from, install_snapshot snp) override;
|
|
|
|
|
|
// server interface
|
|
future<> add_entry(command command, wait_type type, seastar::abort_source* as) override;
|
|
future<> set_configuration(config_member_set c_new, seastar::abort_source* as) override;
|
|
raft::configuration get_configuration() const override;
|
|
future<> start() override;
|
|
future<> abort(sstring reason) override;
|
|
bool is_alive() const override;
|
|
term_t get_current_term() const override;
|
|
future<> read_barrier(seastar::abort_source* as) override;
|
|
void wait_until_candidate() override;
|
|
future<> wait_election_done() override;
|
|
future<> wait_log_idx_term(std::pair<index_t, term_t> idx_log) override;
|
|
std::pair<index_t, term_t> log_last_idx_term() override;
|
|
void elapse_election() override;
|
|
bool is_leader() override;
|
|
raft::server_id current_leader() const override;
|
|
void tick() override;
|
|
raft::server_id id() const override;
|
|
void set_applier_queue_max_size(size_t queue_max_size) override;
|
|
future<> stepdown(logical_clock::duration timeout) override;
|
|
future<> modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) override;
|
|
future<entry_id> add_entry_on_leader(command command, seastar::abort_source* as);
|
|
void register_metrics() override;
|
|
size_t max_command_size() const override;
|
|
private:
|
|
seastar::condition_variable _events;
|
|
|
|
std::unique_ptr<rpc> _rpc;
|
|
std::unique_ptr<state_machine> _state_machine;
|
|
std::unique_ptr<persistence> _persistence;
|
|
seastar::shared_ptr<failure_detector> _failure_detector;
|
|
// Protocol deterministic finite-state machine
|
|
std::unique_ptr<fsm> _fsm;
|
|
// id of this server
|
|
server_id _id;
|
|
server::configuration _config;
|
|
std::optional<promise<>> _stepdown_promise;
|
|
std::optional<shared_promise<>> _leader_promise;
|
|
std::optional<awaited_conf_change> _non_joint_conf_commit_promise;
|
|
std::optional<shared_promise<>> _state_change_promise;
|
|
// Index of the last entry applied to `_state_machine`.
|
|
index_t _applied_idx;
|
|
// Index of the last persisted snapshot descriptor.
|
|
index_t _snapshot_desc_idx;
|
|
std::list<active_read> _reads;
|
|
std::multimap<index_t, awaited_index> _awaited_indexes;
|
|
|
|
// Set to abort reason when abort() is called
|
|
std::optional<sstring> _aborted;
|
|
|
|
// Becomes true during start(), becomes false on abort() or a background error
|
|
bool _is_alive = false;
|
|
|
|
// Signaled when apply index is changed
|
|
condition_variable _applied_index_changed;
|
|
|
|
// Signaled when _snapshot_desc_idx is changed
|
|
condition_variable _snapshot_desc_idx_changed;
|
|
|
|
struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd
|
|
|
|
struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration
|
|
|
|
struct trigger_snapshot_msg{};
|
|
|
|
using applier_fiber_message = std::variant<
|
|
std::vector<log_entry_ptr>,
|
|
snapshot_descriptor,
|
|
removed_from_config,
|
|
trigger_snapshot_msg>;
|
|
queue<applier_fiber_message> _apply_entries = queue<applier_fiber_message>(10);
|
|
|
|
struct stats {
|
|
uint64_t add_command = 0;
|
|
uint64_t add_dummy = 0;
|
|
uint64_t add_config = 0;
|
|
uint64_t append_entries_received = 0;
|
|
uint64_t append_entries_reply_received = 0;
|
|
uint64_t request_vote_received = 0;
|
|
uint64_t request_vote_reply_received = 0;
|
|
uint64_t waiters_awoken = 0;
|
|
uint64_t waiters_dropped = 0;
|
|
uint64_t append_entries_reply_sent = 0;
|
|
uint64_t append_entries_sent = 0;
|
|
uint64_t vote_request_sent = 0;
|
|
uint64_t vote_request_reply_sent = 0;
|
|
uint64_t install_snapshot_sent = 0;
|
|
uint64_t snapshot_reply_sent = 0;
|
|
uint64_t polls = 0;
|
|
uint64_t store_term_and_vote = 0;
|
|
uint64_t store_snapshot = 0;
|
|
uint64_t sm_load_snapshot = 0;
|
|
uint64_t truncate_persisted_log = 0;
|
|
uint64_t persisted_log_entries = 0;
|
|
uint64_t queue_entries_for_apply = 0;
|
|
uint64_t applied_entries = 0;
|
|
uint64_t snapshots_taken = 0;
|
|
uint64_t timeout_now_sent = 0;
|
|
uint64_t timeout_now_received = 0;
|
|
uint64_t read_quorum_sent = 0;
|
|
uint64_t read_quorum_received = 0;
|
|
uint64_t read_quorum_reply_sent = 0;
|
|
uint64_t read_quorum_reply_received = 0;
|
|
} _stats;
|
|
|
|
struct op_status {
|
|
term_t term; // term the entry was added with
|
|
promise<> done; // notify when done here
|
|
optimized_optional<seastar::abort_source::subscription> abort; // abort subscription
|
|
};
|
|
|
|
// Entries that have a waiter that needs to be notified when the
|
|
// respective entry is known to be committed.
|
|
std::map<index_t, op_status> _awaited_commits;
|
|
|
|
// Entries that have a waiter that needs to be notified after
|
|
// the respective entry is applied.
|
|
std::map<index_t, op_status> _awaited_applies;
|
|
|
|
uint64_t _next_snapshot_transfer_id = 0;
|
|
|
|
struct snapshot_transfer {
|
|
future<> f;
|
|
seastar::abort_source as;
|
|
uint64_t id;
|
|
};
|
|
|
|
// Contains active snapshot transfers, to be waited on exit.
|
|
std::unordered_map<server_id, snapshot_transfer> _snapshot_transfers;
|
|
|
|
// Contains aborted snapshot transfers with still unresolved futures
|
|
std::unordered_map<uint64_t, future<>> _aborted_snapshot_transfers;
|
|
|
|
// The optional is engaged when incoming snapshot is received
|
|
// And the promise signalled when it is successfully applied or there was an error
|
|
std::unordered_map<server_id, promise<snapshot_reply>> _snapshot_application_done;
|
|
|
|
struct append_request_queue {
|
|
size_t count = 0;
|
|
future<> f = make_ready_future<>();
|
|
};
|
|
absl::flat_hash_map<server_id, append_request_queue> _append_request_status;
|
|
|
|
struct server_requests {
|
|
bool snapshot = false;
|
|
|
|
bool empty() const {
|
|
return !snapshot;
|
|
}
|
|
};
|
|
|
|
server_requests _new_server_requests;
|
|
|
|
// Called to commit entries (on a leader or otherwise).
|
|
void notify_waiters(std::map<index_t, op_status>& waiters, const std::vector<log_entry_ptr>& entries);
|
|
|
|
// Drop waiter that we lost track of, can happen due to a snapshot transfer,
|
|
// or a leader removed from cluster while some entries added on it are uncommitted.
|
|
void drop_waiters(std::optional<index_t> idx = {});
|
|
|
|
// Wake up all waiter that wait for entries with idx smaller of equal to the one provided
|
|
// to be applied.
|
|
void signal_applied();
|
|
|
|
// Processes FSM output by doing the following steps in order:
|
|
// - persist the current term and vote
|
|
// - persist unstable log entries on disk.
|
|
// - send out messages
|
|
future<> process_fsm_output(index_t& stable_idx, fsm_output&&);
|
|
|
|
future<> process_server_requests(server_requests&&);
|
|
|
|
// Processes new FSM outputs and server requests as they appear.
|
|
future<> io_fiber(index_t stable_idx);
|
|
|
|
// This fiber runs in the background and applies committed entries.
|
|
future<> applier_fiber();
|
|
|
|
template <typename Message> void send_message(server_id id, Message m);
|
|
|
|
// Abort all snapshot transfer.
|
|
// Called when a server id is out of the configuration
|
|
void abort_snapshot_transfer(server_id id);
|
|
|
|
// Abort all snapshot transfers.
|
|
// Called when no longer a leader or on shutdown
|
|
void abort_snapshot_transfers();
|
|
|
|
// Send snapshot in the background and notify FSM about the result.
|
|
void send_snapshot(server_id id, install_snapshot&& snp);
|
|
|
|
future<> _applier_status = make_ready_future<>();
|
|
future<> _io_status = make_ready_future<>();
|
|
|
|
seastar::metrics::metric_groups _metrics;
|
|
|
|
// Server address set to be used by RPC module to maintain its address
|
|
// mappings.
|
|
// Doesn't really correspond to any configuration, neither
|
|
// committed, nor applied. This is just an artificial address set
|
|
// meant entirely for RPC purposes and is constructed from the last
|
|
// configuration entry in the log (prior to sending out the messages in the
|
|
// `io_fiber`) as follows:
|
|
// * If the config is non-joint, it's the current configuration.
|
|
// * If the config is joint, it's defined as a union of current and
|
|
// previous configurations.
|
|
// The motivation behind this is that server should have a collective
|
|
// set of addresses from both leaving and joining nodes before
|
|
// sending the messages, because it may send to both types of nodes.
|
|
// After the new address set is built the diff between the last rpc config
|
|
// observed by the `server_impl` instance and the one obtained from the last
|
|
// conf entry is calculated. The diff is used to maintain rpc state for
|
|
// joining and leaving servers.
|
|
server_address_set _current_rpc_config;
|
|
const server_address_set& get_rpc_config() const;
|
|
// Per-item updates to rpc config.
|
|
void add_to_rpc_config(server_address srv);
|
|
void remove_from_rpc_config(const server_address& srv);
|
|
|
|
future<> wait_for_leader(seastar::abort_source* as) override;
|
|
|
|
future<> wait_for_state_change(seastar::abort_source* as) override;
|
|
|
|
virtual future<bool> trigger_snapshot(seastar::abort_source* as) override;
|
|
|
|
// Get "safe to read" index from a leader
|
|
future<read_barrier_reply> get_read_idx(server_id leader, seastar::abort_source* as);
|
|
// Wait for an entry with a specific term to get committed or
|
|
// applied locally.
|
|
future<> wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as);
|
|
// Wait for a read barrier index to be applied. The index
|
|
// is typically already committed, so we don't worry about the
|
|
// term.
|
|
future<> wait_for_apply(index_t idx, abort_source*);
|
|
|
|
void check_not_aborted();
|
|
void handle_background_error(const char* fiber_name);
|
|
|
|
// Triggered on the next tick, used to delay retries in add_entry, modify_config, read_barrier.
|
|
std::optional<shared_promise<>> _tick_promise;
|
|
future<> wait_for_next_tick(seastar::abort_source* as);
|
|
|
|
|
|
seastar::named_gate _do_on_leader_gate;
|
|
// Call a function on a current leader until it returns stop_iteration::yes.
|
|
// Handles aborts and leader changes, adds a delay between
|
|
// iterations to protect against tight loops.
|
|
template <typename AsyncAction>
|
|
requires requires(server_id& leader, AsyncAction aa) {
|
|
{ aa(leader) } -> std::same_as<future<stop_iteration>>;
|
|
}
|
|
future<> do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action);
|
|
|
|
future<> override_snapshot_thresholds();
|
|
|
|
friend std::ostream& operator<<(std::ostream& os, const server_impl& s);
|
|
};
|
|
|
|
server_impl::server_impl(server_id uuid, std::unique_ptr<rpc> rpc,
|
|
std::unique_ptr<state_machine> state_machine, std::unique_ptr<persistence> persistence,
|
|
seastar::shared_ptr<failure_detector> failure_detector, server::configuration config) :
|
|
_rpc(std::move(rpc)), _state_machine(std::move(state_machine)),
|
|
_persistence(std::move(persistence)), _failure_detector(failure_detector),
|
|
_id(uuid), _config(config), _do_on_leader_gate("raft::server_impl::do_on_leader_gate")
|
|
{
|
|
set_rpc_server(_rpc.get());
|
|
if (_config.snapshot_threshold_log_size > _config.max_log_size) {
|
|
throw config_error(fmt::format("[{}] snapshot_threshold_log_size ({}) must not be greater than max_log_size ({})",
|
|
_id, _config.snapshot_threshold_log_size, _config.max_log_size));
|
|
}
|
|
if (_config.snapshot_trailing_size > _config.snapshot_threshold_log_size) {
|
|
throw config_error(fmt::format("[{}] snapshot_trailing_size ({}) must not be greater than snapshot_threshold_log_size ({})",
|
|
_id, _config.snapshot_trailing_size, _config.snapshot_threshold_log_size));
|
|
}
|
|
if (_config.max_command_size > _config.max_log_size - _config.snapshot_trailing_size) {
|
|
throw config_error(fmt::format(
|
|
"[{}] max_command_size ({}) must not be greater than "
|
|
"max_log_size - snapshot_trailing_size ({} - {} = {})",
|
|
_id,
|
|
_config.max_command_size,
|
|
_config.max_log_size, _config.snapshot_trailing_size,
|
|
_config.max_log_size - _config.snapshot_trailing_size));
|
|
}
|
|
}
|
|
|
|
future<> server_impl::start() {
|
|
auto [term, vote] = co_await _persistence->load_term_and_vote();
|
|
auto snapshot = co_await _persistence->load_snapshot_descriptor();
|
|
auto log_entries = co_await _persistence->load_log();
|
|
auto log = raft::log(snapshot, std::move(log_entries), _config.max_command_size);
|
|
auto commit_idx = co_await _persistence->load_commit_idx();
|
|
raft::configuration rpc_config = log.get_configuration();
|
|
index_t stable_idx = log.stable_idx();
|
|
logger.trace("[{}] start raft instance: snapshot id={} commit index={} last stable index={}", id(), snapshot.id, commit_idx, stable_idx);
|
|
if (commit_idx > stable_idx) {
|
|
on_internal_error(logger, "Raft init failed: committed index cannot be larger then persisted one");
|
|
}
|
|
_fsm = std::make_unique<fsm>(_id, term, vote, std::move(log), commit_idx, *_failure_detector,
|
|
fsm_config {
|
|
.append_request_threshold = _config.append_request_threshold,
|
|
.max_log_size = _config.max_log_size,
|
|
.enable_prevoting = _config.enable_prevoting
|
|
},
|
|
_events);
|
|
|
|
_applied_idx = index_t{0};
|
|
_snapshot_desc_idx = index_t{0};
|
|
if (snapshot.id) {
|
|
co_await _state_machine->load_snapshot(snapshot.id);
|
|
_snapshot_desc_idx = _applied_idx = snapshot.idx;
|
|
}
|
|
|
|
if (!rpc_config.current.empty()) {
|
|
// Update RPC address map from the latest configuration (either from
|
|
// the log or the snapshot)
|
|
//
|
|
// Account both for current and previous configurations since
|
|
// the last configuration idx can point to the joint configuration entry.
|
|
rpc_config.current.merge(rpc_config.previous);
|
|
for (const auto& s: rpc_config.current) {
|
|
add_to_rpc_config(s.addr);
|
|
}
|
|
_rpc->on_configuration_change(get_rpc_config(), {});
|
|
}
|
|
|
|
_is_alive = true;
|
|
|
|
// start fiber to persist entries added to in-memory log
|
|
_io_status = io_fiber(stable_idx);
|
|
// start fiber to apply committed entries
|
|
_applier_status = applier_fiber();
|
|
|
|
// Wait for all committed entries to be applied before returning
|
|
// to make sure that the user's state machine is up-to-date.
|
|
while (_applied_idx < commit_idx) {
|
|
co_await _applied_index_changed.wait();
|
|
}
|
|
|
|
co_return;
|
|
}
|
|
|
|
future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
|
|
check_not_aborted();
|
|
|
|
if (!_tick_promise) {
|
|
_tick_promise.emplace();
|
|
}
|
|
try {
|
|
co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future());
|
|
} catch (abort_requested_exception&) {
|
|
throw request_aborted(format("Aborted while waiting for next tick on server: {}, latest applied entry: {}", _id, _applied_idx));
|
|
}
|
|
}
|
|
|
|
future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
|
check_not_aborted();
|
|
|
|
if (_fsm->current_leader()) {
|
|
co_return;
|
|
}
|
|
|
|
logger.trace("[{}] the leader is unknown, waiting through uncertainty", id());
|
|
_fsm->ping_leader();
|
|
if (!_leader_promise) {
|
|
_leader_promise.emplace();
|
|
}
|
|
|
|
try {
|
|
co_await (as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future());
|
|
} catch (abort_requested_exception&) {
|
|
throw request_aborted(format("Aborted while waiting for leader on server: {}, latest applied entry: {}", _id, _applied_idx));
|
|
}
|
|
}
|
|
|
|
future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
|
|
check_not_aborted();
|
|
|
|
if (!_state_change_promise) {
|
|
_state_change_promise.emplace();
|
|
}
|
|
|
|
try {
|
|
co_await (as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future());
|
|
} catch (abort_requested_exception&) {
|
|
throw request_aborted(fmt::format(
|
|
"Aborted while waiting for state change on server: {}, latest applied entry: {}, current state: {}", _id, _applied_idx, _fsm->current_state()));
|
|
}
|
|
}
|
|
|
|
future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
|
|
check_not_aborted();
|
|
|
|
if (_applied_idx <= _snapshot_desc_idx) {
|
|
logger.debug(
|
|
"[{}] trigger_snapshot: last persisted snapshot descriptor index is up-to-date"
|
|
", applied index: {}, persisted snapshot descriptor index: {}, last fsm log index: {}"
|
|
", last fsm snapshot index: {}", _id, _applied_idx, _snapshot_desc_idx,
|
|
_fsm->log_last_idx(), _fsm->log_last_snapshot_idx());
|
|
co_return false;
|
|
}
|
|
|
|
_new_server_requests.snapshot = true;
|
|
_events.signal();
|
|
|
|
// Wait for persisted snapshot index to catch up to this index.
|
|
auto awaited_idx = _applied_idx;
|
|
|
|
logger.debug("[{}] snapshot request waiting for index {}", _id, awaited_idx);
|
|
|
|
try {
|
|
optimized_optional<abort_source::subscription> sub;
|
|
if (as) {
|
|
as->check();
|
|
sub = as->subscribe([this] () noexcept { _snapshot_desc_idx_changed.broadcast(); });
|
|
SCYLLA_ASSERT(sub); // due to `check()` above
|
|
}
|
|
co_await _snapshot_desc_idx_changed.when([this, as, awaited_idx] {
|
|
return (as && as->abort_requested()) || awaited_idx <= _snapshot_desc_idx;
|
|
});
|
|
if (as) {
|
|
as->check();
|
|
}
|
|
} catch (abort_requested_exception&) {
|
|
throw request_aborted(
|
|
format("Aborted in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on server: {}, latest applied entry: {}",
|
|
awaited_idx,
|
|
_snapshot_desc_idx,
|
|
_id,
|
|
_applied_idx));
|
|
} catch (seastar::broken_condition_variable&) {
|
|
throw request_aborted(format("Condition variable is broken in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on "
|
|
"server: {}, latest applied entry: {}",
|
|
awaited_idx,
|
|
_snapshot_desc_idx,
|
|
_id,
|
|
_applied_idx));
|
|
}
|
|
|
|
logger.debug(
|
|
"[{}] snapshot request satisfied, awaited index {}, persisted snapshot descriptor index: {}"
|
|
", current applied index {}, last fsm log index {}, last fsm snapshot index {}",
|
|
_id, awaited_idx, _snapshot_desc_idx, _applied_idx,
|
|
_fsm->log_last_idx(), _fsm->log_last_snapshot_idx());
|
|
|
|
co_return true;
|
|
}
|
|
|
|
future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as) {
|
|
// The entry may have been already committed and even applied
|
|
// in case it was forwarded to the leader. In this case
|
|
// waiting for it is futile.
|
|
if (eid.idx <= _fsm->commit_idx()) {
|
|
if ((type == wait_type::committed) ||
|
|
(type == wait_type::applied && eid.idx <= _applied_idx)) {
|
|
|
|
auto term = _fsm->log_term_for(eid.idx);
|
|
|
|
_stats.waiters_awoken++;
|
|
|
|
if (!term) {
|
|
// The entry at index `eid.idx` got truncated away.
|
|
// Still, if the last snapshot's term is the same as `eid.term`, we can deduce
|
|
// that our entry `eid` got committed at index `eid.idx` and not some different entry.
|
|
// Indeed, let `snp_idx` be the last snapshot index (`snp_idx >= eid.idx`). Consider
|
|
// the entry that was committed at `snp_idx`; it had the same term as the snapshot's term,
|
|
// `snp_term`. If `eid.term == snp_term`, then we know that the entry at `snp_idx` was
|
|
// created by the same leader as the entry `eid`. A leader doesn't replace an entry
|
|
// that it previously appended, so when it appended the `snp_idx` entry, the entry at
|
|
// `eid.idx` was still `eid`. By the Log Matching Property, every log that had the entry
|
|
// `(snp_idx, snp_term)` also had the entry `eid`. Thus when the snapshot at `snp_idx`
|
|
// was created, it included the entry `eid`.
|
|
auto snap_idx = _fsm->log_last_snapshot_idx();
|
|
auto snap_term = _fsm->log_term_for(snap_idx);
|
|
SCYLLA_ASSERT(snap_term);
|
|
SCYLLA_ASSERT(snap_idx >= eid.idx);
|
|
if (type == wait_type::committed && snap_term == eid.term) {
|
|
logger.trace("[{}] wait_for_entry {}.{}: entry got truncated away, but has the snapshot's term"
|
|
" (snapshot index: {})", id(), eid.term, eid.idx, snap_idx);
|
|
co_return;
|
|
|
|
// We don't do this for `wait_type::applied` - see below why.
|
|
}
|
|
|
|
logger.trace("[{}] wait_for_entry {}.{}: entry got truncated away", id(), eid.term, eid.idx);
|
|
throw commit_status_unknown();
|
|
}
|
|
|
|
if (*term != eid.term) {
|
|
throw dropped_entry();
|
|
}
|
|
|
|
if (type == wait_type::applied && _fsm->log_last_snapshot_idx() >= eid.idx) {
|
|
// We know the entry was committed but the wait type is `applied`
|
|
// and we don't know if the entry was applied with `state_machine::apply`
|
|
// (we may've loaded a snapshot before we managed to apply the entry).
|
|
// As specified by `add_entry`, throw `commit_status_unknown` in this case.
|
|
//
|
|
// FIXME: replace this with a different exception type - `commit_status_unknown`
|
|
// gives too much uncertainty while we know that the entry was committed
|
|
// and had to be applied on at least one server. Some callers of `add_entry`
|
|
// need to know only that the current state includes that entry, whether it was done
|
|
// through `apply` on this server or through receiving a snapshot.
|
|
throw commit_status_unknown();
|
|
}
|
|
|
|
co_return;
|
|
}
|
|
}
|
|
|
|
check_not_aborted();
|
|
|
|
if (as && as->abort_requested()) {
|
|
throw request_aborted(format(
|
|
"Abort requested before waiting for entry with idx: {}, term: {}; last committed entry: {}, last applied entry: {}",
|
|
eid.idx, eid.term, _fsm->commit_idx(), _applied_idx));
|
|
}
|
|
|
|
auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
|
|
logger.trace("[{}] waiting for entry {}.{}", id(), eid.term, eid.idx);
|
|
|
|
// This will track the commit/apply status of the entry
|
|
auto [it, inserted] = container.emplace(eid.idx, op_status{eid.term, promise<>()});
|
|
if (!inserted) {
|
|
// No two leaders can exist with the same term.
|
|
SCYLLA_ASSERT(it->second.term != eid.term);
|
|
|
|
auto term_of_commit_idx = *_fsm->log_term_for(_fsm->commit_idx());
|
|
if (it->second.term > eid.term) {
|
|
if (term_of_commit_idx > eid.term) {
|
|
// There are some entries committed with a term
|
|
// bigger than ours, our entry must have been
|
|
// already dropped (see 3.6.2 "Committing entries
|
|
// from previous terms").
|
|
_stats.waiters_awoken++;
|
|
throw dropped_entry();
|
|
} else {
|
|
// Our entry might still get committed if another
|
|
// leader is elected with an older log tail, but oh
|
|
// well, we can't wait for two entries with the same
|
|
// index and see which one wins, keep waiting for
|
|
// an entry with a bigger term and hope that the
|
|
// newly elected leader will have a newer log tail.
|
|
_stats.waiters_dropped++;
|
|
throw commit_status_unknown();
|
|
}
|
|
}
|
|
// Let's replace an older-term entry with a newer-term one.
|
|
auto prev_wait = std::move(it->second);
|
|
container.erase(it);
|
|
std::tie(it, inserted) = container.emplace(eid.idx, op_status{eid.term, promise<>()});
|
|
// Set the status of the replaced entry. Same reasoning
|
|
// applies for choosing the right exception status as earlier.
|
|
if (term_of_commit_idx > prev_wait.term) {
|
|
prev_wait.done.set_exception(dropped_entry{});
|
|
_stats.waiters_awoken++;
|
|
} else {
|
|
prev_wait.done.set_exception(commit_status_unknown{});
|
|
_stats.waiters_dropped++;
|
|
}
|
|
}
|
|
SCYLLA_ASSERT(inserted);
|
|
if (as) {
|
|
it->second.abort = as->subscribe([this, it = it, &container] noexcept {
|
|
it->second.done.set_exception(
|
|
request_aborted(format(
|
|
"Abort requested while waiting for entry with idx: {}, term: {}; last committed entry: {}, last applied entry: {}",
|
|
it->first, it->second.term, _fsm->commit_idx(), _applied_idx)));
|
|
container.erase(it);
|
|
});
|
|
SCYLLA_ASSERT(it->second.abort);
|
|
}
|
|
co_await it->second.done.get_future();
|
|
logger.trace("[{}] done waiting for {}.{}", id(), eid.term, eid.idx);
|
|
co_return;
|
|
}
|
|
|
|
future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_source* as) {
|
|
// Wait for sufficient memory to become available
|
|
semaphore_units<> memory_permit;
|
|
while (true) {
|
|
term_t t = _fsm->get_current_term();
|
|
try {
|
|
memory_permit = co_await _fsm->wait_for_memory_permit(as, log::memory_usage_of(cmd, _config.max_command_size));
|
|
} catch (semaphore_aborted&) {
|
|
throw request_aborted(
|
|
format("Semaphore aborted while waiting for memory availability for adding entry on leader in term: {}, on server: {}, current term: {}",
|
|
t,
|
|
_id,
|
|
_fsm->get_current_term()));
|
|
}
|
|
if (t == _fsm->get_current_term()) {
|
|
break;
|
|
}
|
|
memory_permit.release();
|
|
}
|
|
logger.trace("[{}] adding entry after waiting for memory permit", id());
|
|
|
|
try {
|
|
const log_entry& e = _fsm->add_entry(std::move(cmd));
|
|
memory_permit.release();
|
|
co_return entry_id{.term = e.term, .idx = e.idx};
|
|
} catch (const not_a_leader&) {
|
|
// the semaphore is already destroyed, prevent memory_permit from accessing it
|
|
memory_permit.release();
|
|
throw;
|
|
}
|
|
}
|
|
|
|
future<add_entry_reply> server_impl::execute_add_entry(server_id from, command cmd, seastar::abort_source* as) {
|
|
if (from != _id && !_fsm->get_configuration().contains(from)) {
|
|
// Do not accept entries from servers removed from the
|
|
// configuration.
|
|
co_return add_entry_reply{not_a_member{format("Add entry from {} was discarded since "
|
|
"it is not part of the configuration", from)}};
|
|
}
|
|
logger.trace("[{}] adding a forwarded entry from {}", id(), from);
|
|
try {
|
|
co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd), as)};
|
|
} catch (raft::not_a_leader& e) {
|
|
co_return add_entry_reply{transient_error{std::current_exception(), e.leader}};
|
|
}
|
|
}
|
|
|
|
template <typename AsyncAction>
|
|
requires requires (server_id& leader, AsyncAction aa) {
|
|
{ aa(leader) } -> std::same_as<future<stop_iteration>>;
|
|
}
|
|
future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action) {
|
|
server_id leader = _fsm->current_leader(), prev_leader{};
|
|
|
|
check_not_aborted();
|
|
auto gh = _do_on_leader_gate.hold();
|
|
|
|
while (true) {
|
|
if (as && as->abort_requested()) {
|
|
throw request_aborted(format("Request aborted while performing action on leader, current leader: {}, previous leader: {}",
|
|
leader ? leader.to_sstring() : "unknown",
|
|
prev_leader ? prev_leader.to_sstring() : "unknown"));
|
|
}
|
|
check_not_aborted();
|
|
if (leader == server_id{}) {
|
|
co_await wait_for_leader(as);
|
|
leader = _fsm->current_leader();
|
|
continue;
|
|
}
|
|
if (prev_leader && leader == prev_leader) {
|
|
// This is to protect against tight loop in case we didn't get
|
|
// any new information about the current leader.
|
|
// This can happen if the server responds with a transient_error with
|
|
// an empty leader and the current node has not yet learned the new leader.
|
|
// We neglect an excessive delay if the newly elected leader is the same as
|
|
// the previous one, this supposed to be a rare.
|
|
co_await wait_for_next_tick(as);
|
|
prev_leader = leader = server_id{};
|
|
continue;
|
|
}
|
|
prev_leader = leader;
|
|
if (co_await action(leader) == stop_iteration::yes) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> server_impl::add_entry(command command, wait_type type, seastar::abort_source* as) {
|
|
if (command.size() > _config.max_command_size) {
|
|
logger.trace("[{}] add_entry command size exceeds the limit: {} > {}",
|
|
id(), command.size(), _config.max_command_size);
|
|
throw command_is_too_big_error(command.size(), _config.max_command_size);
|
|
}
|
|
_stats.add_command++;
|
|
|
|
check_not_aborted();
|
|
|
|
logger.trace("[{}] an entry is submitted", id());
|
|
if (!_config.enable_forwarding) {
|
|
if (const auto leader = _fsm->current_leader(); leader != _id) {
|
|
throw not_a_leader{leader};
|
|
}
|
|
auto eid = co_await add_entry_on_leader(std::move(command), as);
|
|
co_return co_await wait_for_entry(eid, type, as);
|
|
}
|
|
|
|
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
|
|
auto reply = co_await [&]() -> future<add_entry_reply> {
|
|
if (leader == _id) {
|
|
logger.trace("[{}] an entry proceeds on a leader", id());
|
|
// Make a copy of the command since we may still
|
|
// retry and forward it.
|
|
co_return co_await execute_add_entry(leader, command, as);
|
|
} else {
|
|
logger.trace("[{}] forwarding the entry to {}", id(), leader);
|
|
try {
|
|
co_return co_await _rpc->send_add_entry(leader, command);
|
|
} catch (const transport_error& e) {
|
|
logger.trace("[{}] send_add_entry on {} resulted in {}; "
|
|
"rethrow as commit_status_unknown", _id, leader, e);
|
|
throw raft::commit_status_unknown();
|
|
}
|
|
}
|
|
}();
|
|
if (std::holds_alternative<raft::entry_id>(reply)) {
|
|
co_await wait_for_entry(std::get<raft::entry_id>(reply), type, as);
|
|
co_return stop_iteration::yes;
|
|
}
|
|
if (std::holds_alternative<raft::commit_status_unknown>(reply)) {
|
|
// It should be impossible to obtain `commit_status_unknown` here
|
|
// because neither `execute_add_entry` nor `send_add_entry` wait for the entry
|
|
// to be committed/applied.
|
|
on_internal_error(logger, "add_entry: `execute_add_entry` or `send_add_entry`"
|
|
" returned `commit_status_unknown`");
|
|
}
|
|
if (std::holds_alternative<not_a_member>(reply)) {
|
|
co_await coroutine::return_exception(std::get<not_a_member>(reply));
|
|
}
|
|
const auto& e = std::get<transient_error>(reply);
|
|
logger.trace("[{}] got {}", _id, e);
|
|
leader = e.leader;
|
|
co_return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
future<add_entry_reply> server_impl::execute_modify_config(server_id from,
|
|
std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) {
|
|
|
|
if (from != _id && !_fsm->get_configuration().contains(from)) {
|
|
// Do not accept entries from servers removed from the
|
|
// configuration.
|
|
co_return add_entry_reply{not_a_member{format("Modify config from {} was discarded since "
|
|
"it is not part of the configuration", from)}};
|
|
}
|
|
try {
|
|
// Wait for a new slot to become available
|
|
auto cfg = get_configuration().current;
|
|
for (auto& s : add) {
|
|
logger.trace("[{}] adding server {} as {}", id(), s.addr.id,
|
|
s.can_vote? "voter" : "non-voter");
|
|
auto it = cfg.find(s);
|
|
if (it == cfg.end()) {
|
|
cfg.insert(s);
|
|
} else if (it->can_vote != s.can_vote) {
|
|
cfg.erase(s);
|
|
cfg.insert(s);
|
|
logger.trace("[{}] server {} already in configuration now {}",
|
|
id(), s.addr.id, s.can_vote? "voter" : "non-voter");
|
|
} else {
|
|
logger.warn("[{}] the server {} already exists in configuration as {}",
|
|
id(), s.addr.id, s.can_vote? "voter" : "non-voter");
|
|
}
|
|
}
|
|
for (auto& to_remove: del) {
|
|
logger.trace("[{}] removing server {}", id(), to_remove);
|
|
// erase(to_remove) only available from C++23
|
|
auto it = cfg.find(to_remove);
|
|
if (it != cfg.end()) {
|
|
cfg.erase(it);
|
|
}
|
|
}
|
|
co_await set_configuration(cfg, as);
|
|
|
|
// `modify_config` doesn't actually need the entry id for anything
|
|
// but we reuse the `add_entry` RPC verb which requires it.
|
|
co_return add_entry_reply{entry_id{}};
|
|
} catch (raft::error& e) {
|
|
if (is_uncertainty(e)) {
|
|
// Although modify_config() is safe to retry, preserve
|
|
// information that the entry may already have been
|
|
// committed in the return value.
|
|
co_return add_entry_reply{commit_status_unknown()};
|
|
}
|
|
if (const auto* ex = dynamic_cast<const not_a_leader*>(&e)) {
|
|
co_return add_entry_reply{transient_error{std::current_exception(), ex->leader}};
|
|
}
|
|
if (dynamic_cast<const dropped_entry*>(&e)) {
|
|
co_return add_entry_reply{transient_error{std::current_exception(), {}}};
|
|
}
|
|
if (dynamic_cast<const conf_change_in_progress*>(&e)) {
|
|
co_return add_entry_reply{transient_error{std::current_exception(), {}}};
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
future<> server_impl::modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) {
|
|
check_not_aborted();
|
|
|
|
utils::get_local_injector().inject("raft/throw_commit_status_unknown_in_modify_config", [] {
|
|
throw raft::commit_status_unknown();
|
|
});
|
|
|
|
if (!_config.enable_forwarding) {
|
|
const auto leader = _fsm->current_leader();
|
|
if (leader != _id) {
|
|
throw not_a_leader{leader};
|
|
}
|
|
auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del), as);
|
|
if (std::holds_alternative<raft::entry_id>(reply)) {
|
|
co_return;
|
|
}
|
|
throw raft::not_a_leader{_fsm->current_leader()};
|
|
}
|
|
|
|
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
|
|
auto reply = co_await [&]() -> future<add_entry_reply> {
|
|
if (leader == _id) {
|
|
// Make a copy since of the params since we may
|
|
// still retry and forward them.
|
|
co_return co_await execute_modify_config(leader, add, del, as);
|
|
} else {
|
|
logger.trace("[{}] forwarding the entry to {}", id(), leader);
|
|
try {
|
|
co_return co_await _rpc->send_modify_config(leader, add, del);
|
|
} catch (const transport_error& e) {
|
|
logger.trace("[{}] send_modify_config on {} resulted in {}; "
|
|
"rethrow as commit_status_unknown", _id, leader, e);
|
|
throw raft::commit_status_unknown();
|
|
}
|
|
}
|
|
}();
|
|
if (std::holds_alternative<raft::entry_id>(reply)) {
|
|
// Do not wait for the entry locally. The reply means that the leader committed it,
|
|
// and there is no reason to wait for our local commit index to match.
|
|
// See also #9981.
|
|
co_return stop_iteration::yes;
|
|
}
|
|
if (const auto e = std::get_if<raft::transient_error>(&reply)) {
|
|
logger.trace("[{}] got {}", _id, *e);
|
|
leader = e->leader;
|
|
co_return stop_iteration::no;
|
|
}
|
|
if (std::holds_alternative<not_a_member>(reply)) {
|
|
co_await coroutine::return_exception(std::get<not_a_member>(reply));
|
|
}
|
|
throw std::get<raft::commit_status_unknown>(reply);
|
|
});
|
|
}
|
|
|
|
void server_impl::append_entries(server_id from, append_request append_request) {
|
|
_stats.append_entries_received++;
|
|
_fsm->step(from, std::move(append_request));
|
|
}
|
|
|
|
void server_impl::append_entries_reply(server_id from, append_reply reply) {
|
|
_stats.append_entries_reply_received++;
|
|
_fsm->step(from, std::move(reply));
|
|
}
|
|
|
|
void server_impl::request_vote(server_id from, vote_request vote_request) {
|
|
_stats.request_vote_received++;
|
|
_fsm->step(from, std::move(vote_request));
|
|
}
|
|
|
|
void server_impl::request_vote_reply(server_id from, vote_reply vote_reply) {
|
|
_stats.request_vote_reply_received++;
|
|
_fsm->step(from, std::move(vote_reply));
|
|
}
|
|
|
|
void server_impl::timeout_now_request(server_id from, timeout_now timeout_now) {
|
|
_stats.timeout_now_received++;
|
|
_fsm->step(from, std::move(timeout_now));
|
|
}
|
|
|
|
void server_impl::read_quorum_request(server_id from, struct read_quorum read_quorum) {
|
|
_stats.read_quorum_received++;
|
|
_fsm->step(from, std::move(read_quorum));
|
|
}
|
|
|
|
void server_impl::read_quorum_reply(server_id from, struct read_quorum_reply read_quorum_reply) {
|
|
_stats.read_quorum_reply_received++;
|
|
_fsm->step(from, std::move(read_quorum_reply));
|
|
}
|
|
|
|
void server_impl::notify_waiters(std::map<index_t, op_status>& waiters,
|
|
const std::vector<log_entry_ptr>& entries) {
|
|
index_t commit_idx = entries.back()->idx;
|
|
index_t first_idx = entries.front()->idx;
|
|
|
|
while (waiters.size() != 0) {
|
|
auto it = waiters.begin();
|
|
if (it->first > commit_idx) {
|
|
break;
|
|
}
|
|
auto [entry_idx, status] = std::move(*it);
|
|
|
|
// if there is a waiter entry with an index smaller than first entry
|
|
// it means that notification is out of order which is prohibited
|
|
SCYLLA_ASSERT(entry_idx >= first_idx);
|
|
|
|
waiters.erase(it);
|
|
if (status.term == entries[(entry_idx - first_idx).value()]->term) {
|
|
status.done.set_value();
|
|
} else {
|
|
// The terms do not match which means that between the
|
|
// times the entry was submitted and committed there
|
|
// was a leadership change and the entry was replaced.
|
|
status.done.set_exception(dropped_entry());
|
|
}
|
|
_stats.waiters_awoken++;
|
|
}
|
|
// Drop all waiters with smaller term that last one been committed
|
|
// since there is no way they will be committed any longer (terms in
|
|
// the log only grow).
|
|
term_t last_committed_term = entries.back()->term;
|
|
while (waiters.size() != 0) {
|
|
auto it = waiters.begin();
|
|
if (it->second.term < last_committed_term) {
|
|
it->second.done.set_exception(dropped_entry());
|
|
waiters.erase(it);
|
|
_stats.waiters_awoken++;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void server_impl::drop_waiters(std::optional<index_t> idx) {
|
|
auto drop = [&] (std::map<index_t, op_status>& waiters) {
|
|
while (waiters.size() != 0) {
|
|
auto it = waiters.begin();
|
|
if (idx && it->first > *idx) {
|
|
break;
|
|
}
|
|
auto [entry_idx, status] = std::move(*it);
|
|
waiters.erase(it);
|
|
status.done.set_exception(commit_status_unknown());
|
|
_stats.waiters_dropped++;
|
|
}
|
|
};
|
|
drop(_awaited_commits);
|
|
drop(_awaited_applies);
|
|
}
|
|
|
|
void server_impl::signal_applied() {
|
|
auto it = _awaited_indexes.begin();
|
|
|
|
while (it != _awaited_indexes.end()) {
|
|
if (it->first > _applied_idx) {
|
|
break;
|
|
}
|
|
it->second.promise.set_value();
|
|
it = _awaited_indexes.erase(it);
|
|
}
|
|
}
|
|
|
|
template <typename Message>
|
|
void server_impl::send_message(server_id id, Message m) {
|
|
std::visit([this, id] (auto&& m) {
|
|
using T = std::decay_t<decltype(m)>;
|
|
if constexpr (std::is_same_v<T, append_reply>) {
|
|
_stats.append_entries_reply_sent++;
|
|
_rpc->send_append_entries_reply(id, m);
|
|
} else if constexpr (std::is_same_v<T, append_request>) {
|
|
_stats.append_entries_sent++;
|
|
_append_request_status[id].count++;
|
|
_append_request_status[id].f = _append_request_status[id].f.then([this, cm = std::move(m), cid = id] () noexcept -> future<> {
|
|
// We need to copy everything from the capture because it cannot be accessed after co-routine yields.
|
|
server_impl* server = this;
|
|
auto m = std::move(cm);
|
|
auto id = cid;
|
|
try {
|
|
co_await server->_rpc->send_append_entries(id, m);
|
|
} catch(...) {
|
|
logger.debug("[{}] io_fiber failed to send a message to {}: {}", server->_id, id, std::current_exception());
|
|
}
|
|
server->_append_request_status[id].count--;
|
|
if (server->_append_request_status[id].count == 0) {
|
|
server->_append_request_status.erase(id);
|
|
}
|
|
});
|
|
} else if constexpr (std::is_same_v<T, vote_request>) {
|
|
_stats.vote_request_sent++;
|
|
_rpc->send_vote_request(id, m);
|
|
} else if constexpr (std::is_same_v<T, vote_reply>) {
|
|
_stats.vote_request_reply_sent++;
|
|
_rpc->send_vote_reply(id, m);
|
|
} else if constexpr (std::is_same_v<T, timeout_now>) {
|
|
_stats.timeout_now_sent++;
|
|
_rpc->send_timeout_now(id, m);
|
|
} else if constexpr (std::is_same_v<T, struct read_quorum>) {
|
|
_stats.read_quorum_sent++;
|
|
_rpc->send_read_quorum(id, std::move(m));
|
|
} else if constexpr (std::is_same_v<T, struct read_quorum_reply>) {
|
|
_stats.read_quorum_reply_sent++;
|
|
_rpc->send_read_quorum_reply(id, std::move(m));
|
|
} else if constexpr (std::is_same_v<T, install_snapshot>) {
|
|
_stats.install_snapshot_sent++;
|
|
// Send in the background.
|
|
send_snapshot(id, std::move(m));
|
|
} else if constexpr (std::is_same_v<T, snapshot_reply>) {
|
|
_stats.snapshot_reply_sent++;
|
|
SCYLLA_ASSERT(_snapshot_application_done.contains(id));
|
|
// Send a reply to install_snapshot after
|
|
// snapshot application is done.
|
|
_snapshot_application_done[id].set_value(std::move(m));
|
|
_snapshot_application_done.erase(id);
|
|
} else {
|
|
static_assert(!sizeof(T*), "not all message types are handled");
|
|
}
|
|
}, std::move(m));
|
|
}
|
|
|
|
// Like `configuration_diff` but with `can_vote` information forgotten.
|
|
struct rpc_config_diff {
|
|
server_address_set joining, leaving;
|
|
};
|
|
|
|
static rpc_config_diff diff_address_sets(const server_address_set& prev, const config_member_set& current) {
|
|
rpc_config_diff result;
|
|
for (const auto& s : current) {
|
|
if (!prev.contains(s.addr)) {
|
|
result.joining.insert(s.addr);
|
|
}
|
|
}
|
|
for (const auto& s : prev) {
|
|
if (!current.contains(s.id)) {
|
|
result.leaving.insert(s);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) {
|
|
if (batch.term_and_vote) {
|
|
// Current term and vote are always persisted
|
|
// together. A vote may change independently of
|
|
// term, but it's safe to update both in this
|
|
// case.
|
|
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
|
|
_stats.store_term_and_vote++;
|
|
}
|
|
|
|
if (batch.snp) {
|
|
const auto& [snp, is_local, preserve_log_entries] = *batch.snp;
|
|
logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id);
|
|
// Persist the snapshot
|
|
co_await _persistence->store_snapshot_descriptor(snp, preserve_log_entries);
|
|
_snapshot_desc_idx = snp.idx;
|
|
_snapshot_desc_idx_changed.broadcast();
|
|
_stats.store_snapshot++;
|
|
// If this is locally generated snapshot there is no need to
|
|
// load it.
|
|
if (!is_local) {
|
|
co_await _apply_entries.push_eventually(std::move(snp));
|
|
}
|
|
}
|
|
|
|
for (const auto& snp_id: batch.snps_to_drop) {
|
|
_state_machine->drop_snapshot(snp_id);
|
|
}
|
|
|
|
if (batch.log_entries.size()) {
|
|
auto& entries = batch.log_entries;
|
|
|
|
if (last_stable >= entries[0]->idx) {
|
|
co_await _persistence->truncate_log(entries[0]->idx);
|
|
_stats.truncate_persisted_log++;
|
|
}
|
|
|
|
utils::get_local_injector().inject("store_log_entries/test-failure",
|
|
[] { throw std::runtime_error("store_log_entries/test-failure"); });
|
|
|
|
// Combine saving and truncating into one call?
|
|
// will require persistence to keep track of last idx
|
|
co_await _persistence->store_log_entries(entries);
|
|
|
|
last_stable = (*entries.crbegin())->idx;
|
|
_stats.persisted_log_entries += entries.size();
|
|
}
|
|
|
|
// Update RPC server address mappings. Add servers which are joining
|
|
// the cluster according to the new configuration (obtained from the
|
|
// last_conf_idx).
|
|
//
|
|
// It should be done prior to sending the messages since the RPC
|
|
// module needs to know who should it send the messages to (actual
|
|
// network addresses of the joining servers).
|
|
rpc_config_diff rpc_diff;
|
|
if (batch.configuration) {
|
|
rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration);
|
|
for (const auto& addr: rpc_diff.joining) {
|
|
add_to_rpc_config(addr);
|
|
}
|
|
_rpc->on_configuration_change(rpc_diff.joining, {});
|
|
}
|
|
|
|
// After entries are persisted we can send messages.
|
|
for (auto&& m : batch.messages) {
|
|
try {
|
|
send_message(m.first, std::move(m.second));
|
|
} catch(...) {
|
|
// Not being able to send a message is not a critical error
|
|
logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception());
|
|
}
|
|
}
|
|
|
|
if (batch.configuration) {
|
|
for (const auto& addr: rpc_diff.leaving) {
|
|
abort_snapshot_transfer(addr.id);
|
|
remove_from_rpc_config(addr);
|
|
}
|
|
_rpc->on_configuration_change({}, rpc_diff.leaving);
|
|
}
|
|
|
|
// Process committed entries.
|
|
if (batch.committed.size()) {
|
|
if (_non_joint_conf_commit_promise) {
|
|
for (const auto& e: batch.committed) {
|
|
const auto* cfg = get_if<raft::configuration>(&e->data);
|
|
if (cfg != nullptr && !cfg->is_joint()) {
|
|
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value();
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
co_await _persistence->store_commit_idx(batch.committed.back()->idx);
|
|
_stats.queue_entries_for_apply += batch.committed.size();
|
|
co_await _apply_entries.push_eventually(std::move(batch.committed));
|
|
}
|
|
|
|
if (batch.max_read_id_with_quorum) {
|
|
while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) {
|
|
_reads.front().promise.set_value(_reads.front().idx);
|
|
_reads.pop_front();
|
|
}
|
|
}
|
|
if (!_fsm->is_leader()) {
|
|
if (_stepdown_promise) {
|
|
std::exchange(_stepdown_promise, std::nullopt)->set_value();
|
|
}
|
|
if (!_current_rpc_config.contains(_id)) {
|
|
// - It's important we push this after we pushed committed entries above. It
|
|
// will cause `applier_fiber` to drop waiters, which should be done after we
|
|
// notify all waiters for entries committed in this batch.
|
|
// - This may happen multiple times if `io_fiber` gets multiple batches when
|
|
// we're outside the configuration, but it should eventually (and generally
|
|
// quickly) stop happening (we're outside the config after all).
|
|
co_await _apply_entries.push_eventually(removed_from_config{});
|
|
}
|
|
// request aborts of snapshot transfers
|
|
abort_snapshot_transfers();
|
|
// abort all read barriers
|
|
for (auto& r : _reads) {
|
|
r.promise.set_value(not_a_leader{_fsm->current_leader()});
|
|
}
|
|
_reads.clear();
|
|
} else if (batch.abort_leadership_transfer) {
|
|
if (_stepdown_promise) {
|
|
std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out"));
|
|
}
|
|
}
|
|
if (_leader_promise && _fsm->current_leader()) {
|
|
std::exchange(_leader_promise, std::nullopt)->set_value();
|
|
}
|
|
if (_state_change_promise && batch.state_changed) {
|
|
std::exchange(_state_change_promise, std::nullopt)->set_value();
|
|
}
|
|
}
|
|
|
|
future<> server_impl::process_server_requests(server_requests&& requests) {
|
|
if (requests.snapshot) {
|
|
co_await _apply_entries.push_eventually(trigger_snapshot_msg{});
|
|
}
|
|
}
|
|
|
|
future<> server_impl::io_fiber(index_t last_stable) {
|
|
logger.trace("[{}] io_fiber start", _id);
|
|
try {
|
|
while (true) {
|
|
bool has_fsm_output = false;
|
|
bool has_server_request = false;
|
|
co_await _events.when([this, &has_fsm_output, &has_server_request] {
|
|
has_fsm_output = _fsm->has_output();
|
|
has_server_request = !_new_server_requests.empty();
|
|
return has_fsm_output || has_server_request;
|
|
});
|
|
|
|
while (utils::get_local_injector().enter("poll_fsm_output/pause")) {
|
|
co_await seastar::sleep(std::chrono::milliseconds(100));
|
|
}
|
|
|
|
_stats.polls++;
|
|
|
|
if (has_fsm_output) {
|
|
auto batch = _fsm->get_output();
|
|
co_await process_fsm_output(last_stable, std::move(batch));
|
|
}
|
|
|
|
if (has_server_request) {
|
|
auto requests = std::exchange(_new_server_requests, server_requests{});
|
|
co_await process_server_requests(std::move(requests));
|
|
}
|
|
}
|
|
} catch (seastar::broken_condition_variable&) {
|
|
// Log fiber is stopped explicitly.
|
|
} catch (stop_apply_fiber&) {
|
|
// Log fiber is stopped explicitly
|
|
} catch (...) {
|
|
handle_background_error("io");
|
|
}
|
|
co_return;
|
|
}
|
|
|
|
void server_impl::send_snapshot(server_id dst, install_snapshot&& snp) {
|
|
seastar::abort_source as;
|
|
uint64_t id = _next_snapshot_transfer_id++;
|
|
// Use `yield()` to ensure that `_rpc->send_snapshot` is called after we emplace `f` in `_snapshot_transfers`.
|
|
// This also catches any exceptions from `_rpc->send_snapshot` into `f`.
|
|
future<> f = yield().then([this, &as, dst, id, snp = std::move(snp)] () mutable {
|
|
return _rpc->send_snapshot(dst, std::move(snp), as).then_wrapped([this, dst, id] (future<snapshot_reply> f) {
|
|
if (_aborted_snapshot_transfers.erase(id)) {
|
|
// The transfer was aborted
|
|
f.ignore_ready_future();
|
|
return;
|
|
}
|
|
_snapshot_transfers.erase(dst);
|
|
auto reply = raft::snapshot_reply{.current_term = _fsm->get_current_term(), .success = false};
|
|
if (f.failed()) {
|
|
auto eptr = f.get_exception();
|
|
const log_level lvl = try_catch<raft::destination_not_alive_error>(eptr) != nullptr
|
|
? log_level::debug
|
|
: log_level::error;
|
|
logger.log(lvl, "[{}] Transferring snapshot to {} failed with: {}", _id, dst, eptr);
|
|
} else {
|
|
logger.trace("[{}] Transferred snapshot to {}", _id, dst);
|
|
reply = f.get();
|
|
}
|
|
_fsm->step(dst, std::move(reply));
|
|
});
|
|
});
|
|
auto res = _snapshot_transfers.emplace(dst, snapshot_transfer{std::move(f), std::move(as), id});
|
|
SCYLLA_ASSERT(res.second);
|
|
}
|
|
|
|
future<snapshot_reply> server_impl::apply_snapshot(server_id from, install_snapshot snp) {
|
|
snapshot_reply reply{_fsm->get_current_term(), false};
|
|
// Previous snapshot processing may still be running if a connection from the leader was broken
|
|
// after it sent install_snapshot but before it got a reply. It may case the snapshot to be resent
|
|
// and it may arrive before the previous one is processed. In this rare case we return error and the leader
|
|
// will try again later (or may be not if the snapshot that is been applied is recent enough)
|
|
if (!_snapshot_application_done.contains(from)) {
|
|
_fsm->step(from, std::move(snp));
|
|
|
|
try {
|
|
reply = co_await _snapshot_application_done[from].get_future();
|
|
} catch (...) {
|
|
logger.error("apply_snapshot[{}] failed with {}", _id, std::current_exception());
|
|
}
|
|
}
|
|
co_return reply;
|
|
}
|
|
|
|
future<> server_impl::applier_fiber() {
|
|
logger.trace("applier_fiber start");
|
|
|
|
try {
|
|
while (true) {
|
|
auto v = co_await _apply_entries.pop_eventually();
|
|
|
|
co_await std::visit(make_visitor(
|
|
[this] (std::vector<log_entry_ptr>& batch) -> future<> {
|
|
if (batch.empty()) {
|
|
logger.trace("[{}] applier fiber: received empty batch", _id);
|
|
co_return;
|
|
}
|
|
|
|
// Completion notification code assumes that previous snapshot is applied
|
|
// before new entries are committed, otherwise it asserts that some
|
|
// notifications were missing. To prevent a committed entry to
|
|
// be notified before an earlier snapshot is applied do both
|
|
// notification and snapshot application in the same fiber
|
|
notify_waiters(_awaited_commits, batch);
|
|
|
|
std::vector<command_cref> commands;
|
|
commands.reserve(batch.size());
|
|
|
|
const index_t last_idx = batch.back()->idx;
|
|
const term_t last_term = batch.back()->term;
|
|
SCYLLA_ASSERT(last_idx == _applied_idx + index_t{batch.size()});
|
|
|
|
std::ranges::copy(
|
|
batch |
|
|
std::views::filter([] (log_entry_ptr& entry) { return std::holds_alternative<command>(entry->data); }) |
|
|
std::views::transform([] (log_entry_ptr& entry) { return std::cref(std::get<command>(entry->data)); }),
|
|
std::back_inserter(commands));
|
|
|
|
const auto size = commands.size();
|
|
if (size) {
|
|
try {
|
|
co_await _state_machine->apply(std::move(commands));
|
|
} catch (abort_requested_exception& e) {
|
|
logger.info("[{}] applier fiber stopped because state machine was aborted: {}", _id, e);
|
|
throw stop_apply_fiber{};
|
|
} catch (...) {
|
|
std::throw_with_nested(raft::state_machine_error{});
|
|
}
|
|
_stats.applied_entries += size;
|
|
}
|
|
|
|
// Use error injection to override the snapshot thresholds.
|
|
// NOTE: we do not want to yield later since a snapshot could be applied in the meantime,
|
|
// outdating the variables _applied_idx and last_snap_idx.
|
|
co_await override_snapshot_thresholds();
|
|
|
|
_applied_idx = last_idx;
|
|
_applied_index_changed.broadcast();
|
|
notify_waiters(_awaited_applies, batch);
|
|
|
|
// It may happen that _fsm has already applied a later snapshot (from remote) that we didn't yet 'observe'
|
|
// (i.e. didn't yet receive from _apply_entries queue) but will soon. We avoid unnecessary work
|
|
// of taking snapshots ourselves but comparing our last index directly with what's currently in _fsm.
|
|
const auto last_snap_idx = _fsm->log_last_snapshot_idx();
|
|
|
|
const bool force_snapshot = utils::get_local_injector().enter("raft_server_force_snapshot");
|
|
|
|
if (force_snapshot || (_applied_idx > last_snap_idx &&
|
|
((_applied_idx - last_snap_idx).value() >= _config.snapshot_threshold ||
|
|
_fsm->log_memory_usage() >= _config.snapshot_threshold_log_size)))
|
|
{
|
|
snapshot_descriptor snp;
|
|
snp.term = last_term;
|
|
snp.idx = _applied_idx;
|
|
snp.config = _fsm->log_last_conf_for(_applied_idx);
|
|
logger.trace("[{}] applier fiber: taking snapshot term={}, idx={}", _id, snp.term, snp.idx);
|
|
snp.id = co_await _state_machine->take_snapshot();
|
|
// Note that at this point (after the `co_await`), _fsm may already have applied a later snapshot.
|
|
// That's fine, `_fsm->apply_snapshot` will simply ignore our current attempt; we will soon receive
|
|
// a later snapshot from the queue.
|
|
auto max_trailing = force_snapshot ? 0 : _config.snapshot_trailing;
|
|
auto max_trailing_bytes = force_snapshot ? 0 : _config.snapshot_trailing_size;
|
|
if (!_fsm->apply_snapshot(snp, max_trailing, max_trailing_bytes, true)) {
|
|
logger.trace("[{}] applier fiber: while taking snapshot term={} idx={} id={},"
|
|
" fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx());
|
|
}
|
|
_stats.snapshots_taken++;
|
|
}
|
|
},
|
|
[this] (snapshot_descriptor& snp) -> future<> {
|
|
SCYLLA_ASSERT(snp.idx >= _applied_idx);
|
|
// Apply snapshot it to the state machine
|
|
logger.trace("[{}] apply_fiber applying snapshot {}", _id, snp.id);
|
|
co_await _state_machine->load_snapshot(snp.id);
|
|
drop_waiters(snp.idx);
|
|
_applied_idx = snp.idx;
|
|
_applied_index_changed.broadcast();
|
|
_stats.sm_load_snapshot++;
|
|
},
|
|
[this] (const removed_from_config&) -> future<> {
|
|
// If the node is no longer part of a config and no longer the leader
|
|
// it may never know the status of entries it submitted.
|
|
drop_waiters();
|
|
co_return;
|
|
},
|
|
[this] (const trigger_snapshot_msg&) -> future<> {
|
|
auto applied_term = _fsm->log_term_for(_applied_idx);
|
|
// last truncation index <= snapshot index <= applied index
|
|
SCYLLA_ASSERT(applied_term);
|
|
|
|
snapshot_descriptor snp;
|
|
snp.term = *applied_term;
|
|
snp.idx = _applied_idx;
|
|
snp.config = _fsm->log_last_conf_for(_applied_idx);
|
|
logger.trace("[{}] taking snapshot at term={}, idx={} due to request", _id, snp.term, snp.idx);
|
|
snp.id = co_await _state_machine->take_snapshot();
|
|
if (!_fsm->apply_snapshot(snp, 0, 0, true)) {
|
|
logger.trace("[{}] while taking snapshot term={} idx={} id={} due to request,"
|
|
" fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx());
|
|
}
|
|
_stats.snapshots_taken++;
|
|
}
|
|
), v);
|
|
|
|
signal_applied();
|
|
}
|
|
} catch(stop_apply_fiber& ex) {
|
|
// the fiber is aborted
|
|
} catch (...) {
|
|
handle_background_error("applier");
|
|
}
|
|
co_return;
|
|
}
|
|
|
|
term_t server_impl::get_current_term() const {
|
|
return _fsm->get_current_term();
|
|
}
|
|
|
|
future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
|
|
if (as && as->abort_requested()) {
|
|
throw request_aborted(format(
|
|
"Aborted before waiting for applying entry: {}, last committed entry: {}, last applied entry: {}",
|
|
idx, _fsm->commit_idx(), _applied_idx));
|
|
}
|
|
|
|
check_not_aborted();
|
|
|
|
if (idx > _applied_idx) {
|
|
// The index is not applied yet. Wait for it.
|
|
// This will be signalled when read_idx is applied
|
|
auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}});
|
|
if (as) {
|
|
it->second.abort = as->subscribe([this, it] noexcept {
|
|
it->second.promise.set_exception(
|
|
request_aborted(format(
|
|
"Aborted while waiting to apply entry: {}, last committed entry: {}, last applied entry: {}",
|
|
it->first, _fsm->commit_idx(), _applied_idx)));
|
|
_awaited_indexes.erase(it);
|
|
});
|
|
SCYLLA_ASSERT(it->second.abort);
|
|
}
|
|
co_await it->second.promise.get_future();
|
|
}
|
|
}
|
|
|
|
future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, seastar::abort_source* as) {
|
|
check_not_aborted();
|
|
|
|
logger.trace("[{}] execute_read_barrier start", _id);
|
|
|
|
std::optional<std::pair<read_id, index_t>> rid;
|
|
try {
|
|
rid = _fsm->start_read_barrier(from);
|
|
if (!rid) {
|
|
// cannot start a barrier yet
|
|
return make_ready_future<read_barrier_reply>(std::monostate{});
|
|
}
|
|
} catch (not_a_leader& err) {
|
|
return make_ready_future<read_barrier_reply>(err);
|
|
}
|
|
logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}",
|
|
_id, rid->first, rid->second);
|
|
if (as && as->abort_requested()) {
|
|
return make_exception_future<read_barrier_reply>(
|
|
request_aborted(format("Abort requested before waiting for read barrier from {}, read id is {} for commit idx {}", from, rid->first, rid->second)));
|
|
}
|
|
_reads.push_back({rid->first, rid->second, {}, {}});
|
|
auto read = std::prev(_reads.end());
|
|
if (as) {
|
|
read->abort = as->subscribe([this, read, from] noexcept {
|
|
read->promise.set_exception(
|
|
request_aborted(format("Abort requested while waiting for read barrier from {}, read id is {} for commit idx {}", from, read->id, read->idx)));
|
|
_reads.erase(read);
|
|
});
|
|
SCYLLA_ASSERT(read->abort);
|
|
}
|
|
return read->promise.get_future();
|
|
}
|
|
|
|
future<read_barrier_reply> server_impl::get_read_idx(server_id leader, seastar::abort_source* as) {
|
|
if (_id == leader) {
|
|
return execute_read_barrier(_id, as);
|
|
} else {
|
|
return _rpc->execute_read_barrier_on_leader(leader);
|
|
}
|
|
}
|
|
|
|
future<> server_impl::read_barrier(seastar::abort_source* as) {
|
|
logger.trace("[{}] read_barrier start", _id);
|
|
index_t read_idx;
|
|
|
|
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
|
|
auto applied = _applied_idx;
|
|
read_barrier_reply res;
|
|
try {
|
|
res = co_await get_read_idx(leader, as);
|
|
} catch (const transport_error& e) {
|
|
logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e);
|
|
leader = server_id{};
|
|
co_return stop_iteration::no;
|
|
}
|
|
if (std::holds_alternative<std::monostate>(res)) {
|
|
// the leader is not ready to answer because it did not
|
|
// committed any entries yet, so wait for any entry to be
|
|
// committed (if non were since start of the attempt) and retry.
|
|
logger.trace("[{}] read_barrier leader not ready", _id);
|
|
co_await wait_for_apply(++applied, as);
|
|
co_return stop_iteration::no;
|
|
}
|
|
if (std::holds_alternative<raft::not_a_leader>(res)) {
|
|
leader = std::get<not_a_leader>(res).leader;
|
|
co_return stop_iteration::no;
|
|
}
|
|
read_idx = std::get<index_t>(res);
|
|
_fsm->maybe_update_commit_idx_for_read(read_idx);
|
|
co_return stop_iteration::yes;
|
|
});
|
|
|
|
logger.trace("[{}] read_barrier read index {}, applied index {}", _id, read_idx, _applied_idx);
|
|
co_return co_await wait_for_apply(read_idx, as);
|
|
}
|
|
|
|
void server_impl::abort_snapshot_transfer(server_id id) {
|
|
auto it = _snapshot_transfers.find(id);
|
|
if (it != _snapshot_transfers.end()) {
|
|
auto& [f, as, tid] = it->second;
|
|
logger.trace("[{}] Request abort of snapshot transfer to {}", _id, id);
|
|
as.request_abort();
|
|
_aborted_snapshot_transfers.emplace(tid, std::move(f));
|
|
_snapshot_transfers.erase(it);
|
|
}
|
|
}
|
|
|
|
void server_impl::abort_snapshot_transfers() {
|
|
for (auto&& [id, t] : _snapshot_transfers) {
|
|
logger.trace("[{}] Request abort of snapshot transfer to {}", _id, id);
|
|
t.as.request_abort();
|
|
_aborted_snapshot_transfers.emplace(t.id, std::move(t.f));
|
|
}
|
|
_snapshot_transfers.clear();
|
|
}
|
|
|
|
void server_impl::check_not_aborted() {
|
|
if (_aborted) {
|
|
throw stopped_error(*_aborted);
|
|
}
|
|
}
|
|
|
|
void server_impl::handle_background_error(const char* fiber_name) {
|
|
_is_alive = false;
|
|
const auto e = std::current_exception();
|
|
logger.error("[{}] {} fiber stopped because of the error: {}", _id, fiber_name, e);
|
|
if (_config.on_background_error) {
|
|
_config.on_background_error(e);
|
|
}
|
|
}
|
|
|
|
future<> server_impl::abort(sstring reason) {
|
|
_is_alive = false;
|
|
_aborted = std::move(reason);
|
|
logger.trace("[{}]: abort() called", _id);
|
|
_fsm->stop();
|
|
_events.broken();
|
|
_snapshot_desc_idx_changed.broken();
|
|
|
|
// IO and applier fibers may update waiters and start new snapshot
|
|
// transfers, so abort them first
|
|
_apply_entries.abort(std::make_exception_ptr(stop_apply_fiber()));
|
|
co_await seastar::when_all_succeed(std::move(_io_status), std::move(_applier_status)).discard_result();
|
|
|
|
// Start RPC abort before aborting snapshot applications or destroying entry waiters.
|
|
// After calling `_rpc->abort()` no new snapshot applications should be started or new waiters created
|
|
// (see `rpc::abort()` comment and `_aborted` flag).
|
|
auto abort_rpc = _rpc->abort();
|
|
auto abort_sm = _state_machine->abort();
|
|
auto abort_persistence = _persistence->abort();
|
|
|
|
// Abort snapshot applications before waiting for `abort_rpc`,
|
|
// since the RPC implementation may wait for snapshot applications to finish.
|
|
for (auto&& [_, f] : _snapshot_application_done) {
|
|
f.set_exception(std::runtime_error("Snapshot application aborted"));
|
|
}
|
|
|
|
// Destroy entry waiters before waiting for `abort_rpc`,
|
|
// since the RPC implementation may wait for forwarded `modify_config` calls to finish
|
|
// (and `modify_config` does not finish until the configuration entry is committed or an error occurs).
|
|
for (auto& ac: _awaited_commits) {
|
|
ac.second.done.set_exception(stopped_error(*_aborted));
|
|
}
|
|
for (auto& aa: _awaited_applies) {
|
|
aa.second.done.set_exception(stopped_error(*_aborted));
|
|
}
|
|
_awaited_commits.clear();
|
|
_awaited_applies.clear();
|
|
if (_non_joint_conf_commit_promise) {
|
|
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_exception(stopped_error(*_aborted));
|
|
}
|
|
|
|
// Complete all read attempts with not_a_leader
|
|
for (auto& r: _reads) {
|
|
r.promise.set_value(raft::not_a_leader{server_id{}});
|
|
}
|
|
_reads.clear();
|
|
|
|
// Abort all read_barriers with an exception
|
|
for (auto& i : _awaited_indexes) {
|
|
i.second.promise.set_exception(stopped_error(*_aborted));
|
|
}
|
|
_awaited_indexes.clear();
|
|
|
|
co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result();
|
|
|
|
if (_leader_promise) {
|
|
_leader_promise->set_exception(stopped_error(*_aborted));
|
|
}
|
|
if (_tick_promise) {
|
|
_tick_promise->set_exception(stopped_error(*_aborted));
|
|
}
|
|
|
|
if (_state_change_promise) {
|
|
_state_change_promise->set_exception(stopped_error(*_aborted));
|
|
}
|
|
|
|
abort_snapshot_transfers();
|
|
|
|
auto snp_futures = _aborted_snapshot_transfers | boost::adaptors::map_values;
|
|
|
|
auto append_futures = _append_request_status | boost::adaptors::map_values | boost::adaptors::transformed([] (append_request_queue& a) -> future<>& { return a.f; });
|
|
|
|
auto all_futures = boost::range::join(snp_futures, append_futures);
|
|
|
|
std::array<future<>, 1> gate{_do_on_leader_gate.close()};
|
|
|
|
auto all_with_gate = boost::range::join(all_futures, gate);
|
|
|
|
co_await seastar::when_all_succeed(all_with_gate.begin(), all_with_gate.end()).discard_result();
|
|
}
|
|
|
|
bool server_impl::is_alive() const {
|
|
return _is_alive;
|
|
}
|
|
|
|
future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_source* as) {
|
|
check_not_aborted();
|
|
const auto& cfg = _fsm->get_configuration();
|
|
// 4.1 Cluster membership changes. Safety.
|
|
// When the leader receives a request to add or remove a server
|
|
// from its current configuration (C old ), it appends the new
|
|
// configuration (C new ) as an entry in its log and replicates
|
|
// that entry using the normal Raft mechanism.
|
|
auto [joining, leaving] = cfg.diff(c_new);
|
|
if (joining.size() == 0 && leaving.size() == 0) {
|
|
co_return;
|
|
}
|
|
|
|
_stats.add_config++;
|
|
|
|
if (_non_joint_conf_commit_promise) {
|
|
logger.warn("[{}] set_configuration: a configuration change is still in progress (at index: {}, config: {})",
|
|
_id, _fsm->log_last_conf_idx(), cfg);
|
|
throw conf_change_in_progress{};
|
|
}
|
|
|
|
const auto& e = _fsm->add_entry(raft::configuration{std::move(c_new)});
|
|
|
|
// We've just submitted a joint configuration to be committed.
|
|
// Once the FSM discovers a committed joint configuration,
|
|
// it appends a corresponding non-joint entry.
|
|
// By waiting for the joint configuration first we ensure
|
|
// that the next non-joint configuration we get from fsm in io_fiber
|
|
// would be the one corresponding to our joint configuration,
|
|
// no matter if the leader changed in the meantime.
|
|
|
|
auto f = _non_joint_conf_commit_promise.emplace().promise.get_future();
|
|
if (as) {
|
|
_non_joint_conf_commit_promise->abort = as->subscribe([this, idx = e.idx, term = e.term] noexcept {
|
|
// If we're inside this callback, the subscription wasn't destroyed yet.
|
|
// The subscription is destroyed when the field is reset, so if we're here, the field must be engaged.
|
|
SCYLLA_ASSERT(_non_joint_conf_commit_promise);
|
|
// Whoever resolves the promise must reset the field. Thus, if we're here, the promise is not resolved.
|
|
std::exchange(_non_joint_conf_commit_promise, std::nullopt)
|
|
->promise.set_exception(request_aborted(
|
|
format("Aborted while setting configuration (at index: {}, term: {}, current config: {})", idx, term, _fsm->get_configuration())));
|
|
});
|
|
}
|
|
|
|
try {
|
|
co_await wait_for_entry({.term = e.term, .idx = e.idx}, wait_type::committed, as);
|
|
} catch (...) {
|
|
_non_joint_conf_commit_promise.reset();
|
|
// We need to 'observe' possible exceptions in f, otherwise they will be
|
|
// considered unhandled and cause a warning.
|
|
(void)f.handle_exception([id = _id] (auto e) {
|
|
logger.trace("[{}] error while waiting for non-joint configuration to be committed: {}", id, e);
|
|
});
|
|
throw;
|
|
}
|
|
co_await std::move(f);
|
|
}
|
|
|
|
raft::configuration
|
|
server_impl::get_configuration() const {
|
|
return _fsm->get_configuration();
|
|
}
|
|
|
|
void server_impl::register_metrics() {
|
|
namespace sm = seastar::metrics;
|
|
_metrics.add_group("raft", {
|
|
sm::make_total_operations("add_entries", _stats.add_command,
|
|
sm::description("Number of entries added on this node, the log_entry_type label can be command, dummy or config"), {server_id_label(_id), log_entry_type("command")}),
|
|
sm::make_total_operations("add_entries", _stats.add_dummy,
|
|
sm::description("Number of entries added on this node, the log_entry_type label can be command, dummy or config"), {server_id_label(_id), log_entry_type("dummy")}),
|
|
sm::make_total_operations("add_entries", _stats.add_config,
|
|
sm::description("Number of entries added on this node, the log_entry_type label can be command, dummy or config"), {server_id_label(_id), log_entry_type("config")}),
|
|
|
|
sm::make_total_operations("messages_received", _stats.append_entries_received,
|
|
sm::description("Number of messages received, the message_type determines the type of message"), {server_id_label(_id), message_type("append_entries")}),
|
|
sm::make_total_operations("messages_received", _stats.append_entries_reply_received,
|
|
sm::description("Number of messages received, the message_type determines the type of message"), {server_id_label(_id), message_type("append_entries_reply")}),
|
|
sm::make_total_operations("messages_received", _stats.request_vote_received,
|
|
sm::description("Number of messages received, the message_type determines the type of message"), {server_id_label(_id), message_type("request_vote")}),
|
|
sm::make_total_operations("messages_received", _stats.request_vote_reply_received,
|
|
sm::description("Number of messages received, the message_type determines the type of message"), {server_id_label(_id), message_type("request_vote_reply")}),
|
|
sm::make_total_operations("messages_received", _stats.timeout_now_received,
|
|
sm::description("Number of messages received, the message_type determines the type of message"), {server_id_label(_id), message_type("timeout_now")}),
|
|
sm::make_total_operations("messages_received", _stats.read_quorum_received,
|
|
sm::description("Number of messages received, the message_type determines the type of message"), {server_id_label(_id), message_type("read_quorum")}),
|
|
sm::make_total_operations("messages_received", _stats.read_quorum_reply_received,
|
|
sm::description("Number of messages received, the message_type determines the type of message"), {server_id_label(_id), message_type("read_quorum_reply")}),
|
|
|
|
sm::make_total_operations("messages_sent", _stats.append_entries_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("append_entries")}),
|
|
sm::make_total_operations("messages_sent", _stats.append_entries_reply_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("append_entries_reply")}),
|
|
sm::make_total_operations("messages_sent", _stats.vote_request_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("request_vote")}),
|
|
sm::make_total_operations("messages_sent", _stats.vote_request_reply_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("request_vote_reply")}),
|
|
sm::make_total_operations("messages_sent", _stats.install_snapshot_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("install_snapshot")}),
|
|
sm::make_total_operations("messages_sent", _stats.snapshot_reply_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("snapshot_reply")}),
|
|
sm::make_total_operations("messages_sent", _stats.timeout_now_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("timeout_now")}),
|
|
sm::make_total_operations("messages_sent", _stats.read_quorum_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("read_quorum")}),
|
|
sm::make_total_operations("messages_sent", _stats.read_quorum_reply_sent,
|
|
sm::description("Number of messages sent, the message_type determines the type of message"), {server_id_label(_id), message_type("read_quorum_reply")}),
|
|
|
|
sm::make_total_operations("waiter_awoken", _stats.waiters_awoken,
|
|
sm::description("Number of waiters that got result back"), {server_id_label(_id)}),
|
|
sm::make_total_operations("waiter_dropped", _stats.waiters_dropped,
|
|
sm::description("Number of waiters that did not get result back"), {server_id_label(_id)}),
|
|
sm::make_total_operations("polls", _stats.polls,
|
|
sm::description("Number of times raft state machine polled"), {server_id_label(_id)}),
|
|
sm::make_total_operations("store_term_and_vote", _stats.store_term_and_vote,
|
|
sm::description("Number of times term and vote persisted"), {server_id_label(_id)}),
|
|
sm::make_total_operations("store_snapshot", _stats.store_snapshot,
|
|
sm::description("Number of snapshots persisted"), {server_id_label(_id)}),
|
|
sm::make_total_operations("sm_load_snapshot", _stats.sm_load_snapshot,
|
|
sm::description("Number of times user state machine reloaded with a snapshot"), {server_id_label(_id)}),
|
|
sm::make_total_operations("truncate_persisted_log", _stats.truncate_persisted_log,
|
|
sm::description("Number of times log truncated on storage"), {server_id_label(_id)}),
|
|
sm::make_total_operations("persisted_log_entries", _stats.persisted_log_entries,
|
|
sm::description("Number of log entries persisted"), {server_id_label(_id)}),
|
|
sm::make_total_operations("queue_entries_for_apply", _stats.queue_entries_for_apply,
|
|
sm::description("Number of log entries queued to be applied"), {server_id_label(_id)}),
|
|
sm::make_total_operations("applied_entries", _stats.applied_entries,
|
|
sm::description("Number of log entries applied"), {server_id_label(_id)}),
|
|
sm::make_total_operations("snapshots_taken", _stats.snapshots_taken,
|
|
sm::description("Number of times user's state machine snapshotted"), {server_id_label(_id)}),
|
|
|
|
sm::make_gauge("in_memory_log_size", [this] { return _fsm->in_memory_log_size(); },
|
|
sm::description("size of in-memory part of the log"), {server_id_label(_id)}),
|
|
sm::make_gauge("log_memory_usage", [this] { return _fsm->log_memory_usage(); },
|
|
sm::description("memory usage of in-memory part of the log in bytes"), {server_id_label(_id)}),
|
|
sm::make_gauge("log_last_index", [this] { return _fsm->log_last_idx().value(); },
|
|
sm::description("term of the last log entry"), {server_id_label(_id)}),
|
|
sm::make_gauge("log_last_term", [this] { return _fsm->log_last_term().value(); },
|
|
sm::description("index of the last log entry"), {server_id_label(_id)}),
|
|
sm::make_gauge("snapshot_last_index", [this] { return _fsm->log_last_snapshot_idx().value(); },
|
|
sm::description("term of the snapshot"), {server_id_label(_id)}),
|
|
sm::make_gauge("snapshot_last_term", [this] { return _fsm->log_term_for(_fsm->log_last_snapshot_idx()).value().value(); },
|
|
sm::description("index of the snapshot"), {server_id_label(_id)}),
|
|
sm::make_gauge("state", [this] { return _fsm->state_to_metric(); },
|
|
sm::description("current state: 0 - follower, 1 - candidate, 2 - leader"), {server_id_label(_id)}),
|
|
sm::make_gauge("commit_index", [this] { return _fsm->commit_idx().value(); },
|
|
sm::description("commit index"), {server_id_label(_id)}),
|
|
sm::make_gauge("apply_index", [this] { return _applied_idx.value(); },
|
|
sm::description("applied index"), {server_id_label(_id)}),
|
|
});
|
|
}
|
|
|
|
void server_impl::wait_until_candidate() {
|
|
while (_fsm->is_follower()) {
|
|
_fsm->tick();
|
|
}
|
|
}
|
|
|
|
// Wait until candidate is either leader or reverts to follower
|
|
future<> server_impl::wait_election_done() {
|
|
while (_fsm->is_candidate()) {
|
|
co_await yield();
|
|
};
|
|
}
|
|
|
|
future<> server_impl::wait_log_idx_term(std::pair<index_t, term_t> idx_log) {
|
|
while (_fsm->log_last_term() < idx_log.second || _fsm->log_last_idx() < idx_log.first) {
|
|
co_await seastar::sleep(5us);
|
|
}
|
|
}
|
|
|
|
std::pair<index_t, term_t> server_impl::log_last_idx_term() {
|
|
return {_fsm->log_last_idx(), _fsm->log_last_term()};
|
|
}
|
|
|
|
bool server_impl::is_leader() {
|
|
return _fsm->is_leader();
|
|
}
|
|
|
|
raft::server_id server_impl::current_leader() const {
|
|
return _fsm->current_leader();
|
|
}
|
|
|
|
void server_impl::elapse_election() {
|
|
while (_fsm->election_elapsed() < ELECTION_TIMEOUT) {
|
|
_fsm->tick();
|
|
}
|
|
}
|
|
|
|
void server_impl::tick() {
|
|
_fsm->tick();
|
|
|
|
if (_tick_promise && !_aborted) {
|
|
std::exchange(_tick_promise, std::nullopt)->set_value();
|
|
}
|
|
}
|
|
|
|
raft::server_id server_impl::id() const {
|
|
return _id;
|
|
}
|
|
|
|
void server_impl::set_applier_queue_max_size(size_t queue_max_size) {
|
|
_apply_entries.set_max_size(queue_max_size);
|
|
}
|
|
|
|
const server_address_set& server_impl::get_rpc_config() const {
|
|
return _current_rpc_config;
|
|
}
|
|
|
|
void server_impl::add_to_rpc_config(server_address srv) {
|
|
_current_rpc_config.emplace(std::move(srv));
|
|
}
|
|
|
|
void server_impl::remove_from_rpc_config(const server_address& srv) {
|
|
_current_rpc_config.erase(srv);
|
|
}
|
|
|
|
future<> server_impl::stepdown(logical_clock::duration timeout) {
|
|
if (_stepdown_promise) {
|
|
return make_exception_future<>(std::logic_error("Stepdown is already in progress"));
|
|
}
|
|
try {
|
|
_fsm->transfer_leadership(timeout);
|
|
} catch (...) {
|
|
return make_exception_future<>(std::current_exception());
|
|
}
|
|
_stepdown_promise = promise<>();
|
|
return _stepdown_promise->get_future();
|
|
}
|
|
|
|
size_t server_impl::max_command_size() const {
|
|
return _config.max_command_size;
|
|
}
|
|
|
|
std::unique_ptr<server> create_server(server_id uuid, std::unique_ptr<rpc> rpc,
|
|
std::unique_ptr<state_machine> state_machine, std::unique_ptr<persistence> persistence,
|
|
seastar::shared_ptr<failure_detector> failure_detector, server::configuration config) {
|
|
SCYLLA_ASSERT(uuid != raft::server_id{utils::UUID(0, 0)});
|
|
return std::make_unique<raft::server_impl>(uuid, std::move(rpc), std::move(state_machine),
|
|
std::move(persistence), failure_detector, config);
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const server_impl& s) {
|
|
fmt::print(os, "[id: {}, fsm ()]\n", s._id, *s._fsm);
|
|
return os;
|
|
}
|
|
|
|
future<> server_impl::override_snapshot_thresholds() {
|
|
return utils::get_local_injector().inject("raft_server_set_snapshot_thresholds", [this](auto& handler) -> future<> {
|
|
const auto set_parameter = [&handler](auto& target, const std::string_view name) -> void {
|
|
const auto from = handler.get(name);
|
|
if (from) {
|
|
try {
|
|
target = boost::lexical_cast<std::remove_reference_t<decltype(target)>>(*from);
|
|
logger.info("Applied _config.{}={}", name, *from);
|
|
} catch (const boost::bad_lexical_cast& e) {
|
|
on_internal_error(
|
|
logger, fmt::format("Could not apply a snapshot threshold param: {}, value: {}, error: {}",
|
|
name, *from, e.what()));
|
|
}
|
|
}
|
|
};
|
|
|
|
set_parameter(_config.snapshot_threshold, "snapshot_threshold");
|
|
set_parameter(_config.snapshot_threshold_log_size, "snapshot_threshold_log_size");
|
|
set_parameter(_config.snapshot_trailing, "snapshot_trailing");
|
|
set_parameter(_config.snapshot_trailing_size, "snapshot_trailing_size");
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
} // end of namespace raft
|