diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 4503a34130..cef7326c8f 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -758,8 +758,9 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta } future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) { - ctx_ptr->last_attempted_rp = rp; return _resource_manager.get_send_units_for(buf.size_bytes()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable { + ctx_ptr->mark_hint_as_in_progress(rp); + // Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`). (void)with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { try { @@ -778,7 +779,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrshard_stats().sent; }).handle_exception([this, ctx_ptr, rp] (auto eptr) { manager_logger.trace("send_one_hint(): failed to send to {}: {}", end_point_key(), eptr); - ctx_ptr->on_hint_send_failure(rp); + return make_exception_future<>(std::move(eptr)); }); // ignore these errors and move on - probably this hint is too old and the KS/CF has been deleted... @@ -792,17 +793,34 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrshard_stats().discarded; } catch (...) { - manager_logger.debug("send_hints(): unexpected error in file {} at {}: {}", fname, rp, std::current_exception()); - ctx_ptr->on_hint_send_failure(rp); + auto eptr = std::current_exception(); + manager_logger.debug("send_hints(): unexpected error in file {} at {}: {}", fname, rp, eptr); + return make_exception_future<>(std::move(eptr)); } return make_ready_future<>(); - }).finally([units = std::move(units), ctx_ptr] {}); + }).then_wrapped([units = std::move(units), ctx_ptr, rp] (future<>&& f) { + // Information about the error was already printed somewhere higher. + // We just need to account in the ctx that sending of this hint has failed. + if (!f.failed()) { + ctx_ptr->on_hint_send_success(rp); + } else { + ctx_ptr->on_hint_send_failure(rp); + } + f.ignore_ready_future(); + }); }).handle_exception([this, ctx_ptr, rp] (auto eptr) { manager_logger.trace("send_one_file(): Hmmm. Something bad had happend: {}", eptr); ctx_ptr->on_hint_send_failure(rp); }); } +void manager::end_point_hints_manager::sender::send_one_file_ctx::mark_hint_as_in_progress(db::replay_position rp) { + last_attempted_rp = rp; +} + +void manager::end_point_hints_manager::sender::send_one_file_ctx::on_hint_send_success(db::replay_position rp) noexcept { +} + void manager::end_point_hints_manager::sender::send_one_file_ctx::on_hint_send_failure(db::replay_position rp) noexcept { segment_replay_failed = true; if (!first_failed_rp || rp < *first_failed_rp) { diff --git a/db/hints/manager.hh b/db/hints/manager.hh index ef6004a770..2bae9cbb32 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -126,6 +126,8 @@ public: std::optional last_attempted_rp; bool segment_replay_failed = false; + void mark_hint_as_in_progress(db::replay_position rp); + void on_hint_send_success(db::replay_position rp) noexcept; void on_hint_send_failure(db::replay_position rp) noexcept; };