diff --git a/raft/fsm.cc b/raft/fsm.cc index 03c3d99320..545d543a3d 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -688,7 +688,7 @@ void fsm::replicate_to(follower_progress& progress, bool allow_empty) { const snapshot& snapshot = _log.get_snapshot(); // We need to transfer the snapshot before we can // continue syncing the log. - progress.become_snapshot(); + progress.become_snapshot(snapshot.idx); send_to(progress.id, install_snapshot{_current_term, snapshot}); logger.trace("replicate_to[{}->{}]: send snapshot next={} snapshot={}", _my_id, progress.id, progress.next_idx, snapshot.idx); @@ -765,7 +765,7 @@ bool fsm::can_read() { return false; } -void fsm::snapshot_status(server_id id, std::optional idx) { +void fsm::snapshot_status(server_id id, bool success) { follower_progress& progress = *leader_state().tracker.find(id); if (progress.state != follower_progress::state::SNAPSHOT) { @@ -776,8 +776,7 @@ void fsm::snapshot_status(server_id id, std::optional idx) { // No matter if snapshot transfer failed or not move back to probe state progress.become_probe(); - if (idx) { - progress.next_idx = *idx + index_t(1); + if (success) { // If snapshot was successfully transferred start replication immediately replicate_to(progress, false); } diff --git a/raft/fsm.hh b/raft/fsm.hh index 191f2d9f74..fe537ecc58 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -350,7 +350,7 @@ public: // tick period. bool can_read(); - void snapshot_status(server_id id, std::optional idx); + void snapshot_status(server_id id, bool success); // This call will update the log to point to the new snapshot // and will truncate the log prefix up to (snp.idx - trailing) diff --git a/raft/server.cc b/raft/server.cc index 4383cdf202..b40334e361 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -413,15 +413,14 @@ future<> server_impl::io_fiber(index_t last_stable) { } void server_impl::send_snapshot(server_id dst, install_snapshot&& snp) { - index_t snp_idx = snp.snp.idx; - future<> f = _rpc->send_snapshot(dst, std::move(snp)).then_wrapped([this, dst, snp_idx] (future<> f) { + future<> f = _rpc->send_snapshot(dst, std::move(snp)).then_wrapped([this, dst] (future<> f) { _snapshot_transfers.erase(dst); if (f.failed()) { logger.error("[{}] Transferring snapshot to {} failed with: {}", _id, dst, f.get_exception()); - _fsm->snapshot_status(dst, std::nullopt); + _fsm->snapshot_status(dst, false); } else { logger.trace("[{}] Transferred snapshot to {}", _id, dst); - _fsm->snapshot_status(dst, snp_idx); + _fsm->snapshot_status(dst, true); } }); diff --git a/raft/tracker.cc b/raft/tracker.cc index 39a9f75b14..2df3d0e8ad 100644 --- a/raft/tracker.cc +++ b/raft/tracker.cc @@ -62,8 +62,12 @@ void follower_progress::become_pipeline() { } } -void follower_progress::become_snapshot() { +void follower_progress::become_snapshot(index_t snp_idx) { state = state::SNAPSHOT; + // If snapshot transfer succeeds, start replicating from the + // next index, otherwise we will learn the follower's index + // again by sending a probe request. + next_idx = snp_idx + index_t{1}; } bool follower_progress::can_send_to() { diff --git a/raft/tracker.hh b/raft/tracker.hh index 0ddecef215..9793853aff 100644 --- a/raft/tracker.hh +++ b/raft/tracker.hh @@ -60,7 +60,7 @@ public: void become_probe(); void become_pipeline(); - void become_snapshot(); + void become_snapshot(index_t snp_idx); void accepted(index_t idx) { // AppendEntries replies can arrive out of order.