diff --git a/raft/server.cc b/raft/server.cc index b2d51c8eba..d03c65a849 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -200,19 +201,17 @@ private: // the respective entry is applied. std::map _awaited_applies; - uint64_t _next_snapshot_transfer_id = 0; + // Maps each destination to the abort_source of the currently active + // snapshot transfer. The abort_source itself lives on the coroutine + // frame of the background transfer (see send_snapshot), so its + // lifetime is guaranteed. abort_snapshot_transfer(s) can signal it + // and erase the entry synchronously, freeing the slot for immediate + // reuse in the same process_fsm_output call. + std::unordered_map _snapshot_abort_sources; - struct snapshot_transfer { - future<> f; - seastar::abort_source as; - uint64_t id; - }; - - // Contains active snapshot transfers, to be waited on exit. - std::unordered_map _snapshot_transfers; - - // Contains aborted snapshot transfers with still unresolved futures - std::unordered_map> _aborted_snapshot_transfers; + // Gate that tracks all in-flight snapshot transfer coroutines. + // Closed in abort() to wait for them to finish. + seastar::gate _snapshot_gate; // The optional is engaged when incoming snapshot is received // And the promise signalled when it is successfully applied or there was an error @@ -273,7 +272,7 @@ private: void abort_snapshot_transfers(); // Send snapshot in the background and notify FSM about the result. - void send_snapshot(server_id id, install_snapshot&& snp); + void send_snapshot(server_id dst, install_snapshot&& snp); future<> _applier_status = make_ready_future<>(); future<> _io_status = make_ready_future<>(); @@ -1289,34 +1288,53 @@ future<> server_impl::io_fiber(index_t last_stable) { } void server_impl::send_snapshot(server_id dst, install_snapshot&& snp) { - seastar::abort_source as; - uint64_t id = _next_snapshot_transfer_id++; - // Use `yield()` to ensure that `_rpc->send_snapshot` is called after we emplace `f` in `_snapshot_transfers`. - // This also catches any exceptions from `_rpc->send_snapshot` into `f`. - future<> f = yield().then([this, &as, dst, id, snp = std::move(snp)] () mutable { - return _rpc->send_snapshot(dst, std::move(snp), as).then_wrapped([this, dst, id] (future f) { - if (_aborted_snapshot_transfers.erase(id)) { - // The transfer was aborted - f.ignore_ready_future(); - return; + (void)seastar::with_gate(_snapshot_gate, [this, dst, snp = std::move(snp)] (this auto) -> future<> { + // The transfer runs as a background coroutine tracked by _snapshot_gate. + // The abort_source lives on the coroutine frame, and _snapshot_abort_sources + // holds a raw pointer to it. abort_snapshot_transfer(s) can signal it + // and erase the map entry synchronously. + seastar::abort_source as; + auto [_, inserted] = _snapshot_abort_sources.emplace(dst, &as); + + // Cross-term stale transfers are aborted in process_fsm_output + // when term_and_vote changes (before messages are dispatched). + // Within a term the FSM's progress tracker guarantees at most + // one in-flight snapshot transfer per destination. + SCYLLA_ASSERT(inserted); + + auto cleanup = seastar::defer([this, dst, &as] { + // Only erase the map entry if it still points to our abort_source. + // A new transfer for the same dst may have already replaced it. + auto it = _snapshot_abort_sources.find(dst); + if (it != _snapshot_abort_sources.end() && it->second == &as) { + _snapshot_abort_sources.erase(it); } - _snapshot_transfers.erase(dst); - auto reply = raft::snapshot_reply{.current_term = _fsm->get_current_term(), .success = false}; - if (f.failed()) { - auto eptr = f.get_exception(); - const log_level lvl = try_catch(eptr) != nullptr - ? log_level::debug - : log_level::error; - logger.log(lvl, "[{}] Transferring snapshot to {} failed with: {}", _id, dst, eptr); - } else { - logger.trace("[{}] Transferred snapshot to {}", _id, dst); - reply = f.get(); - } - _fsm->step(dst, std::move(reply)); }); + + auto f = co_await coroutine::as_future(futurize_invoke([&] { + return _rpc->send_snapshot(dst, std::move(snp), as); + })); + + if (as.abort_requested()) { + f.ignore_ready_future(); + co_return; + } + + auto reply = raft::snapshot_reply{.current_term = _fsm->get_current_term(), .success = false}; + if (f.failed()) { + auto eptr = f.get_exception(); + const log_level lvl = try_catch(eptr) != nullptr + ? log_level::debug + : log_level::error; + logger.log(lvl, "[{}] Transferring snapshot to {} failed with: {}", _id, dst, eptr); + } else { + logger.trace("[{}] Transferred snapshot to {}", _id, dst); + reply = f.get(); + } + _fsm->step(dst, std::move(reply)); + }).handle_exception([this, dst] (std::exception_ptr ep) { + logger.error("[{}] Snapshot transfer to {} failed with: {}", _id, dst, ep); }); - auto res = _snapshot_transfers.emplace(dst, snapshot_transfer{std::move(f), std::move(as), id}); - SCYLLA_ASSERT(res.second); } future server_impl::apply_snapshot(server_id from, install_snapshot snp) { @@ -1575,23 +1593,20 @@ future<> server_impl::read_barrier(seastar::abort_source* as) { } void server_impl::abort_snapshot_transfer(server_id id) { - auto it = _snapshot_transfers.find(id); - if (it != _snapshot_transfers.end()) { - auto& [f, as, tid] = it->second; + auto it = _snapshot_abort_sources.find(id); + if (it != _snapshot_abort_sources.end()) { logger.trace("[{}] Request abort of snapshot transfer to {}", _id, id); - as.request_abort(); - _aborted_snapshot_transfers.emplace(tid, std::move(f)); - _snapshot_transfers.erase(it); + it->second->request_abort(); + _snapshot_abort_sources.erase(it); } } void server_impl::abort_snapshot_transfers() { - for (auto&& [id, t] : _snapshot_transfers) { + for (auto&& [id, as] : _snapshot_abort_sources) { logger.trace("[{}] Request abort of snapshot transfer to {}", _id, id); - t.as.request_abort(); - _aborted_snapshot_transfers.emplace(t.id, std::move(t.f)); + as->request_abort(); } - _snapshot_transfers.clear(); + _snapshot_abort_sources.clear(); } void server_impl::check_not_aborted() { @@ -1677,17 +1692,13 @@ future<> server_impl::abort(sstring reason) { abort_snapshot_transfers(); - auto snp_futures = _aborted_snapshot_transfers | boost::adaptors::map_values; - auto append_futures = _append_request_status | boost::adaptors::map_values | boost::adaptors::transformed([] (append_request_queue& a) -> future<>& { return a.f; }); - auto all_futures = boost::range::join(snp_futures, append_futures); + std::array, 2> gates{_snapshot_gate.close(), _do_on_leader_gate.close()}; - std::array, 1> gate{_do_on_leader_gate.close()}; + auto all_futures = boost::range::join(append_futures, gates); - auto all_with_gate = boost::range::join(all_futures, gate); - - co_await seastar::when_all_succeed(all_with_gate.begin(), all_with_gate.end()).discard_result(); + co_await seastar::when_all_succeed(all_futures.begin(), all_futures.end()).discard_result(); } bool server_impl::is_alive() const {