raft: allow an option to persist commit index
Raft does not need to persist the commit index since a restarted node will either learn it from an append message from a leader or (if entire cluster is restarted and hence there is no leader) new leader will figure it out after contacting a quorum. But some users may want to be able to bring their local state machine to a state as up-to-date as it was before restart as soon as possible without any external communication. For them this patch introduces new persistence API that allows saving and restoring last seen committed index. Message-Id: <YfFD53oS2j1My0p/@scylladb.com>
This commit is contained in:
committed by
Tomasz Grabiec
parent
43f51e9639
commit
579dcf187a
@@ -204,6 +204,7 @@ schema_ptr system_keyspace::raft() {
|
||||
.with_column("vote", uuid_type, column_kind::static_column)
|
||||
// id of the most recent persisted snapshot
|
||||
.with_column("snapshot_id", uuid_type, column_kind::static_column)
|
||||
.with_column("commit_idx", long_type, column_kind::static_column)
|
||||
|
||||
.set_comment("Persisted RAFT log, votes and snapshot info")
|
||||
.with_version(generate_schema_version(id))
|
||||
|
||||
11
raft/fsm.cc
11
raft/fsm.cc
@@ -18,7 +18,7 @@ leader::~leader() {
|
||||
}
|
||||
|
||||
fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
|
||||
failure_detector& failure_detector, fsm_config config) :
|
||||
index_t commit_idx, failure_detector& failure_detector, fsm_config config) :
|
||||
_my_id(id), _current_term(current_term), _voted_for(voted_for),
|
||||
_log(std::move(log)), _failure_detector(failure_detector), _config(config) {
|
||||
if (id == raft::server_id{}) {
|
||||
@@ -27,12 +27,19 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
|
||||
// The snapshot can not contain uncommitted entries
|
||||
_commit_idx = _log.get_snapshot().idx;
|
||||
_observed.advance(*this);
|
||||
logger.trace("fsm[{}]: starting, current term {}, log length {}", _my_id, _current_term, _log.last_idx());
|
||||
// After we observed the state advance commit_idx to persisted one (if provided)
|
||||
// so that the log can be replayed
|
||||
_commit_idx = std::max(_commit_idx, commit_idx);
|
||||
logger.trace("fsm[{}]: starting, current term {}, log length {}, commit index {}", _my_id, _current_term, _log.last_idx(), _commit_idx);
|
||||
|
||||
// Init timeout settings
|
||||
reset_election_timeout();
|
||||
}
|
||||
|
||||
fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
|
||||
failure_detector& failure_detector, fsm_config config) :
|
||||
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}
|
||||
|
||||
future<> fsm::wait_max_log_size() {
|
||||
check_is_leader();
|
||||
|
||||
|
||||
@@ -154,9 +154,8 @@ class fsm {
|
||||
// nil if none).
|
||||
server_id _voted_for;
|
||||
// Index of the highest log entry known to be committed.
|
||||
// Currently not persisted.
|
||||
// Invariant: _commit_idx >= _log.get_snapshot().idx
|
||||
index_t _commit_idx = index_t(0);
|
||||
index_t _commit_idx;
|
||||
// Log entries; each entry contains a command for state machine,
|
||||
// and the term when the entry was received by the leader.
|
||||
log _log;
|
||||
@@ -334,6 +333,9 @@ protected: // For testing
|
||||
}
|
||||
|
||||
public:
|
||||
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
|
||||
index_t commit_idx, failure_detector& failure_detector, fsm_config conf);
|
||||
|
||||
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
|
||||
failure_detector& failure_detector, fsm_config conf);
|
||||
|
||||
|
||||
11
raft/raft.hh
11
raft/raft.hh
@@ -606,6 +606,17 @@ public:
|
||||
// in parallel with store.
|
||||
virtual future<std::pair<term_t, server_id>> load_term_and_vote() = 0;
|
||||
|
||||
// Persist given commit index.
|
||||
// Cannot be called conccurrently with itself.
|
||||
// Persisting a commit index is optional.
|
||||
virtual future<> store_commit_idx(index_t idx) = 0;
|
||||
|
||||
// Load persisted commit index.
|
||||
// Called during Raft server initialization only, is not run
|
||||
// in parallel with store. If no commit index was storred zero
|
||||
// will be returned.
|
||||
virtual future<index_t> load_commit_idx() = 0;
|
||||
|
||||
// Persist given snapshot and drop all but 'preserve_log_entries'
|
||||
// entries from the Raft log starting from the beginning.
|
||||
// This can overwrite a previously persisted snapshot.
|
||||
|
||||
@@ -103,6 +103,8 @@ private:
|
||||
// Set to true when abort() is called
|
||||
bool _aborted = false;
|
||||
|
||||
// Signaled when apply index is changed
|
||||
condition_variable _applied_index_changed;
|
||||
|
||||
struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd
|
||||
queue<std::variant<std::vector<log_entry_ptr>, snapshot_descriptor>> _apply_entries = queue<std::variant<std::vector<log_entry_ptr>, snapshot_descriptor>>(10);
|
||||
@@ -281,9 +283,13 @@ future<> server_impl::start() {
|
||||
auto snapshot = co_await _persistence->load_snapshot_descriptor();
|
||||
auto log_entries = co_await _persistence->load_log();
|
||||
auto log = raft::log(snapshot, std::move(log_entries));
|
||||
auto commit_idx = co_await _persistence->load_commit_idx();
|
||||
raft::configuration rpc_config = log.get_configuration();
|
||||
index_t stable_idx = log.stable_idx();
|
||||
_fsm = std::make_unique<fsm>(_id, term, vote, std::move(log), *_failure_detector,
|
||||
if (commit_idx > stable_idx) {
|
||||
on_internal_error(logger, "Raft init failed: commited index cannot be larger then persisted one");
|
||||
}
|
||||
_fsm = std::make_unique<fsm>(_id, term, vote, std::move(log), commit_idx, *_failure_detector,
|
||||
fsm_config {
|
||||
.append_request_threshold = _config.append_request_threshold,
|
||||
.max_log_size = _config.max_log_size,
|
||||
@@ -314,6 +320,12 @@ future<> server_impl::start() {
|
||||
// start fiber to apply committed entries
|
||||
_applier_status = applier_fiber();
|
||||
|
||||
// Wait for all committed entries to be applied before returning
|
||||
// to make sure that the user's state machine is up-to-date.
|
||||
while (_applied_idx < commit_idx) {
|
||||
co_await _applied_index_changed.wait();
|
||||
}
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -805,6 +817,7 @@ future<> server_impl::io_fiber(index_t last_stable) {
|
||||
|
||||
// Process committed entries.
|
||||
if (batch.committed.size()) {
|
||||
co_await _persistence->store_commit_idx(batch.committed.back()->idx);
|
||||
_stats.queue_entries_for_apply += batch.committed.size();
|
||||
co_await _apply_entries.push_eventually(std::move(batch.committed));
|
||||
}
|
||||
@@ -940,6 +953,7 @@ future<> server_impl::applier_fiber() {
|
||||
}
|
||||
|
||||
_applied_idx = last_idx;
|
||||
_applied_index_changed.broadcast();
|
||||
notify_waiters(_awaited_applies, batch);
|
||||
|
||||
// It may happen that _fsm has already applied a later snapshot (from remote) that we didn't yet 'observe'
|
||||
@@ -971,6 +985,7 @@ future<> server_impl::applier_fiber() {
|
||||
co_await _state_machine->load_snapshot(snp.id);
|
||||
drop_waiters(snp.idx);
|
||||
_applied_idx = snp.idx;
|
||||
_applied_index_changed.broadcast();
|
||||
_stats.sm_load_snapshot++;
|
||||
}
|
||||
signal_applied();
|
||||
|
||||
@@ -64,6 +64,27 @@ future<std::pair<raft::term_t, raft::server_id>> raft_sys_table_storage::load_te
|
||||
co_return std::pair(vote_term, vote);
|
||||
}
|
||||
|
||||
future<> raft_sys_table_storage::store_commit_idx(raft::index_t idx) {
|
||||
return execute_with_linearization_point([this, idx] {
|
||||
static const auto store_cql = format("INSERT INTO system.{} (group_id, commit_idx) VALUES (?, ?)",
|
||||
db::system_keyspace::RAFT);
|
||||
return _qp.execute_internal(
|
||||
store_cql,
|
||||
{_group_id.id, int64_t(idx)}).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
future<raft::index_t> raft_sys_table_storage::load_commit_idx() {
|
||||
static const auto load_cql = format("SELECT commit_idx FROM system.{} WHERE group_id = ? LIMIT 1", db::system_keyspace::RAFT);
|
||||
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id});
|
||||
if (rs->empty()) {
|
||||
co_return raft::index_t(0);
|
||||
}
|
||||
const auto& static_row = rs->one();
|
||||
co_return raft::index_t(static_row.get_or<int64_t>("commit_idx", raft::index_t{}));
|
||||
}
|
||||
|
||||
|
||||
future<raft::log_entries> raft_sys_table_storage::load_log() {
|
||||
static const auto load_cql = format("SELECT term, \"index\", data FROM system.{} WHERE group_id = ?", db::system_keyspace::RAFT);
|
||||
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id});
|
||||
|
||||
@@ -56,6 +56,8 @@ public:
|
||||
|
||||
future<> store_term_and_vote(raft::term_t term, raft::server_id vote) override;
|
||||
future<std::pair<raft::term_t, raft::server_id>> load_term_and_vote() override;
|
||||
future<> store_commit_idx(raft::index_t) override;
|
||||
future<raft::index_t> load_commit_idx() override;
|
||||
future<raft::log_entries> load_log() override;
|
||||
future<raft::snapshot_descriptor> load_snapshot_descriptor() override;
|
||||
|
||||
|
||||
@@ -805,6 +805,14 @@ public:
|
||||
co_return _persistence->load_term_and_vote();
|
||||
}
|
||||
|
||||
virtual future<> store_commit_idx(raft::index_t) override {
|
||||
co_return;
|
||||
}
|
||||
|
||||
virtual future<raft::index_t> load_commit_idx() override {
|
||||
co_return raft::index_t{0};
|
||||
}
|
||||
|
||||
// Stores not only the snapshot descriptor but also the corresponding snapshot.
|
||||
virtual future<> store_snapshot_descriptor(const raft::snapshot_descriptor& snap, size_t preserve_log_entries) override {
|
||||
auto it = _snapshots.find(snap.id);
|
||||
|
||||
@@ -424,6 +424,12 @@ public:
|
||||
auto term_and_vote = std::make_pair(_conf.term, _conf.vote);
|
||||
return make_ready_future<std::pair<raft::term_t, raft::server_id>>(term_and_vote);
|
||||
}
|
||||
future<> store_commit_idx(raft::index_t) override {
|
||||
co_return;
|
||||
}
|
||||
future<raft::index_t> load_commit_idx() override {
|
||||
co_return raft::index_t{0};
|
||||
}
|
||||
future<> store_snapshot_descriptor(const raft::snapshot_descriptor& snap, size_t preserve_log_entries) override {
|
||||
(*_persisted_snapshots)[_id] = std::make_pair(snap, (*_snapshots)[_id][snap.id]);
|
||||
tlogger.debug("sm[{}] persists snapshot {}", _id, (*_snapshots)[_id][snap.id].hasher.finalize_uint64());
|
||||
|
||||
Reference in New Issue
Block a user