mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-22 15:52:13 +00:00
raft: set follower's next_idx when switching to SNAPSHOT mode
Set follower's next_idx to snapshot index + 1 when switching it to snapshot mode. If snapshot transfer succeeds, that's the best match for the follower's next replication index. If it fails, the leader will send a new probe to find out the follower position again and re-try sending a possibly newer snapshot. The change helps reduce protocol state managed outside FSM.
This commit is contained in:
@@ -688,7 +688,7 @@ void fsm::replicate_to(follower_progress& progress, bool allow_empty) {
|
||||
const snapshot& snapshot = _log.get_snapshot();
|
||||
// We need to transfer the snapshot before we can
|
||||
// continue syncing the log.
|
||||
progress.become_snapshot();
|
||||
progress.become_snapshot(snapshot.idx);
|
||||
send_to(progress.id, install_snapshot{_current_term, snapshot});
|
||||
logger.trace("replicate_to[{}->{}]: send snapshot next={} snapshot={}",
|
||||
_my_id, progress.id, progress.next_idx, snapshot.idx);
|
||||
@@ -765,7 +765,7 @@ bool fsm::can_read() {
|
||||
return false;
|
||||
}
|
||||
|
||||
void fsm::snapshot_status(server_id id, std::optional<index_t> idx) {
|
||||
void fsm::snapshot_status(server_id id, bool success) {
|
||||
follower_progress& progress = *leader_state().tracker.find(id);
|
||||
|
||||
if (progress.state != follower_progress::state::SNAPSHOT) {
|
||||
@@ -776,8 +776,7 @@ void fsm::snapshot_status(server_id id, std::optional<index_t> idx) {
|
||||
// No matter if snapshot transfer failed or not move back to probe state
|
||||
progress.become_probe();
|
||||
|
||||
if (idx) {
|
||||
progress.next_idx = *idx + index_t(1);
|
||||
if (success) {
|
||||
// If snapshot was successfully transferred start replication immediately
|
||||
replicate_to(progress, false);
|
||||
}
|
||||
|
||||
@@ -350,7 +350,7 @@ public:
|
||||
// tick period.
|
||||
bool can_read();
|
||||
|
||||
void snapshot_status(server_id id, std::optional<index_t> idx);
|
||||
void snapshot_status(server_id id, bool success);
|
||||
|
||||
// This call will update the log to point to the new snapshot
|
||||
// and will truncate the log prefix up to (snp.idx - trailing)
|
||||
|
||||
@@ -413,15 +413,14 @@ future<> server_impl::io_fiber(index_t last_stable) {
|
||||
}
|
||||
|
||||
void server_impl::send_snapshot(server_id dst, install_snapshot&& snp) {
|
||||
index_t snp_idx = snp.snp.idx;
|
||||
future<> f = _rpc->send_snapshot(dst, std::move(snp)).then_wrapped([this, dst, snp_idx] (future<> f) {
|
||||
future<> f = _rpc->send_snapshot(dst, std::move(snp)).then_wrapped([this, dst] (future<> f) {
|
||||
_snapshot_transfers.erase(dst);
|
||||
if (f.failed()) {
|
||||
logger.error("[{}] Transferring snapshot to {} failed with: {}", _id, dst, f.get_exception());
|
||||
_fsm->snapshot_status(dst, std::nullopt);
|
||||
_fsm->snapshot_status(dst, false);
|
||||
} else {
|
||||
logger.trace("[{}] Transferred snapshot to {}", _id, dst);
|
||||
_fsm->snapshot_status(dst, snp_idx);
|
||||
_fsm->snapshot_status(dst, true);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
@@ -62,8 +62,12 @@ void follower_progress::become_pipeline() {
|
||||
}
|
||||
}
|
||||
|
||||
void follower_progress::become_snapshot() {
|
||||
void follower_progress::become_snapshot(index_t snp_idx) {
|
||||
state = state::SNAPSHOT;
|
||||
// If snapshot transfer succeeds, start replicating from the
|
||||
// next index, otherwise we will learn the follower's index
|
||||
// again by sending a probe request.
|
||||
next_idx = snp_idx + index_t{1};
|
||||
}
|
||||
|
||||
bool follower_progress::can_send_to() {
|
||||
|
||||
@@ -60,7 +60,7 @@ public:
|
||||
|
||||
void become_probe();
|
||||
void become_pipeline();
|
||||
void become_snapshot();
|
||||
void become_snapshot(index_t snp_idx);
|
||||
|
||||
void accepted(index_t idx) {
|
||||
// AppendEntries replies can arrive out of order.
|
||||
|
||||
Reference in New Issue
Block a user