raft: allow an option to persist commit index

Raft does not need to persist the commit index since a restarted node will
either learn it from an append message from a leader or (if entire cluster
is restarted and hence there is no leader) new leader will figure it out
after contacting a quorum. But some users may want to be able to bring
their local state machine to a state as up-to-date as it was before restart
as soon as possible without any external communication.

For them this patch introduces new persistence API that allows saving
and restoring last seen committed index.

Message-Id: <YfFD53oS2j1My0p/@scylladb.com>
This commit is contained in:
Gleb Natapov
2022-01-26 14:51:51 +02:00
committed by Tomasz Grabiec
parent 43f51e9639
commit 579dcf187a
9 changed files with 78 additions and 5 deletions

View File

@@ -204,6 +204,7 @@ schema_ptr system_keyspace::raft() {
.with_column("vote", uuid_type, column_kind::static_column)
// id of the most recent persisted snapshot
.with_column("snapshot_id", uuid_type, column_kind::static_column)
.with_column("commit_idx", long_type, column_kind::static_column)
.set_comment("Persisted RAFT log, votes and snapshot info")
.with_version(generate_schema_version(id))

View File

@@ -18,7 +18,7 @@ leader::~leader() {
}
fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
index_t commit_idx, failure_detector& failure_detector, fsm_config config) :
_my_id(id), _current_term(current_term), _voted_for(voted_for),
_log(std::move(log)), _failure_detector(failure_detector), _config(config) {
if (id == raft::server_id{}) {
@@ -27,12 +27,19 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
// The snapshot can not contain uncommitted entries
_commit_idx = _log.get_snapshot().idx;
_observed.advance(*this);
logger.trace("fsm[{}]: starting, current term {}, log length {}", _my_id, _current_term, _log.last_idx());
// After we observed the state advance commit_idx to persisted one (if provided)
// so that the log can be replayed
_commit_idx = std::max(_commit_idx, commit_idx);
logger.trace("fsm[{}]: starting, current term {}, log length {}, commit index {}", _my_id, _current_term, _log.last_idx(), _commit_idx);
// Init timeout settings
reset_election_timeout();
}
fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}
future<> fsm::wait_max_log_size() {
check_is_leader();

View File

@@ -154,9 +154,8 @@ class fsm {
// nil if none).
server_id _voted_for;
// Index of the highest log entry known to be committed.
// Currently not persisted.
// Invariant: _commit_idx >= _log.get_snapshot().idx
index_t _commit_idx = index_t(0);
index_t _commit_idx;
// Log entries; each entry contains a command for state machine,
// and the term when the entry was received by the leader.
log _log;
@@ -334,6 +333,9 @@ protected: // For testing
}
public:
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
index_t commit_idx, failure_detector& failure_detector, fsm_config conf);
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config conf);

View File

@@ -606,6 +606,17 @@ public:
// in parallel with store.
virtual future<std::pair<term_t, server_id>> load_term_and_vote() = 0;
// Persist given commit index.
// Cannot be called conccurrently with itself.
// Persisting a commit index is optional.
virtual future<> store_commit_idx(index_t idx) = 0;
// Load persisted commit index.
// Called during Raft server initialization only, is not run
// in parallel with store. If no commit index was storred zero
// will be returned.
virtual future<index_t> load_commit_idx() = 0;
// Persist given snapshot and drop all but 'preserve_log_entries'
// entries from the Raft log starting from the beginning.
// This can overwrite a previously persisted snapshot.

View File

@@ -103,6 +103,8 @@ private:
// Set to true when abort() is called
bool _aborted = false;
// Signaled when apply index is changed
condition_variable _applied_index_changed;
struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd
queue<std::variant<std::vector<log_entry_ptr>, snapshot_descriptor>> _apply_entries = queue<std::variant<std::vector<log_entry_ptr>, snapshot_descriptor>>(10);
@@ -281,9 +283,13 @@ future<> server_impl::start() {
auto snapshot = co_await _persistence->load_snapshot_descriptor();
auto log_entries = co_await _persistence->load_log();
auto log = raft::log(snapshot, std::move(log_entries));
auto commit_idx = co_await _persistence->load_commit_idx();
raft::configuration rpc_config = log.get_configuration();
index_t stable_idx = log.stable_idx();
_fsm = std::make_unique<fsm>(_id, term, vote, std::move(log), *_failure_detector,
if (commit_idx > stable_idx) {
on_internal_error(logger, "Raft init failed: commited index cannot be larger then persisted one");
}
_fsm = std::make_unique<fsm>(_id, term, vote, std::move(log), commit_idx, *_failure_detector,
fsm_config {
.append_request_threshold = _config.append_request_threshold,
.max_log_size = _config.max_log_size,
@@ -314,6 +320,12 @@ future<> server_impl::start() {
// start fiber to apply committed entries
_applier_status = applier_fiber();
// Wait for all committed entries to be applied before returning
// to make sure that the user's state machine is up-to-date.
while (_applied_idx < commit_idx) {
co_await _applied_index_changed.wait();
}
co_return;
}
@@ -805,6 +817,7 @@ future<> server_impl::io_fiber(index_t last_stable) {
// Process committed entries.
if (batch.committed.size()) {
co_await _persistence->store_commit_idx(batch.committed.back()->idx);
_stats.queue_entries_for_apply += batch.committed.size();
co_await _apply_entries.push_eventually(std::move(batch.committed));
}
@@ -940,6 +953,7 @@ future<> server_impl::applier_fiber() {
}
_applied_idx = last_idx;
_applied_index_changed.broadcast();
notify_waiters(_awaited_applies, batch);
// It may happen that _fsm has already applied a later snapshot (from remote) that we didn't yet 'observe'
@@ -971,6 +985,7 @@ future<> server_impl::applier_fiber() {
co_await _state_machine->load_snapshot(snp.id);
drop_waiters(snp.idx);
_applied_idx = snp.idx;
_applied_index_changed.broadcast();
_stats.sm_load_snapshot++;
}
signal_applied();

View File

@@ -64,6 +64,27 @@ future<std::pair<raft::term_t, raft::server_id>> raft_sys_table_storage::load_te
co_return std::pair(vote_term, vote);
}
future<> raft_sys_table_storage::store_commit_idx(raft::index_t idx) {
return execute_with_linearization_point([this, idx] {
static const auto store_cql = format("INSERT INTO system.{} (group_id, commit_idx) VALUES (?, ?)",
db::system_keyspace::RAFT);
return _qp.execute_internal(
store_cql,
{_group_id.id, int64_t(idx)}).discard_result();
});
}
future<raft::index_t> raft_sys_table_storage::load_commit_idx() {
static const auto load_cql = format("SELECT commit_idx FROM system.{} WHERE group_id = ? LIMIT 1", db::system_keyspace::RAFT);
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id});
if (rs->empty()) {
co_return raft::index_t(0);
}
const auto& static_row = rs->one();
co_return raft::index_t(static_row.get_or<int64_t>("commit_idx", raft::index_t{}));
}
future<raft::log_entries> raft_sys_table_storage::load_log() {
static const auto load_cql = format("SELECT term, \"index\", data FROM system.{} WHERE group_id = ?", db::system_keyspace::RAFT);
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id});

View File

@@ -56,6 +56,8 @@ public:
future<> store_term_and_vote(raft::term_t term, raft::server_id vote) override;
future<std::pair<raft::term_t, raft::server_id>> load_term_and_vote() override;
future<> store_commit_idx(raft::index_t) override;
future<raft::index_t> load_commit_idx() override;
future<raft::log_entries> load_log() override;
future<raft::snapshot_descriptor> load_snapshot_descriptor() override;

View File

@@ -805,6 +805,14 @@ public:
co_return _persistence->load_term_and_vote();
}
virtual future<> store_commit_idx(raft::index_t) override {
co_return;
}
virtual future<raft::index_t> load_commit_idx() override {
co_return raft::index_t{0};
}
// Stores not only the snapshot descriptor but also the corresponding snapshot.
virtual future<> store_snapshot_descriptor(const raft::snapshot_descriptor& snap, size_t preserve_log_entries) override {
auto it = _snapshots.find(snap.id);

View File

@@ -424,6 +424,12 @@ public:
auto term_and_vote = std::make_pair(_conf.term, _conf.vote);
return make_ready_future<std::pair<raft::term_t, raft::server_id>>(term_and_vote);
}
future<> store_commit_idx(raft::index_t) override {
co_return;
}
future<raft::index_t> load_commit_idx() override {
co_return raft::index_t{0};
}
future<> store_snapshot_descriptor(const raft::snapshot_descriptor& snap, size_t preserve_log_entries) override {
(*_persisted_snapshots)[_id] = std::make_pair(snap, (*_snapshots)[_id][snap.id]);
tlogger.debug("sm[{}] persists snapshot {}", _id, (*_snapshots)[_id][snap.id].hasher.finalize_uint64());