hints: rearrange error handling logic for hint sending

Instead of calling the `on_hint_send_failure` method inside the hint
sending task in places where an error occurs, we now let the exceptions
be returned and handle them inside a single `then_wrapped` attached to
the hint sending task.

Apart from the `then_wrapped`, there is one more place which calls
`on_hint_send_failure` - in the exception handler for the future which
spawns the asynchronous hint sending task. It needs to be kept separate
because it is a part of a separate task.
This commit is contained in:
Piotr Dulikowski
2021-06-22 16:57:48 +02:00
parent 45b04c94e0
commit 08a7d79ffc
2 changed files with 25 additions and 5 deletions

View File

@@ -758,8 +758,9 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta
}
future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<send_one_file_ctx> ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) {
ctx_ptr->last_attempted_rp = rp;
return _resource_manager.get_send_units_for(buf.size_bytes()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable {
ctx_ptr->mark_hint_as_in_progress(rp);
// Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`).
(void)with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable {
try {
@@ -778,7 +779,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
++this->shard_stats().sent;
}).handle_exception([this, ctx_ptr, rp] (auto eptr) {
manager_logger.trace("send_one_hint(): failed to send to {}: {}", end_point_key(), eptr);
ctx_ptr->on_hint_send_failure(rp);
return make_exception_future<>(std::move(eptr));
});
// ignore these errors and move on - probably this hint is too old and the KS/CF has been deleted...
@@ -792,17 +793,34 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
++this->shard_stats().discarded;
} catch (...) {
manager_logger.debug("send_hints(): unexpected error in file {} at {}: {}", fname, rp, std::current_exception());
ctx_ptr->on_hint_send_failure(rp);
auto eptr = std::current_exception();
manager_logger.debug("send_hints(): unexpected error in file {} at {}: {}", fname, rp, eptr);
return make_exception_future<>(std::move(eptr));
}
return make_ready_future<>();
}).finally([units = std::move(units), ctx_ptr] {});
}).then_wrapped([units = std::move(units), ctx_ptr, rp] (future<>&& f) {
// Information about the error was already printed somewhere higher.
// We just need to account in the ctx that sending of this hint has failed.
if (!f.failed()) {
ctx_ptr->on_hint_send_success(rp);
} else {
ctx_ptr->on_hint_send_failure(rp);
}
f.ignore_ready_future();
});
}).handle_exception([this, ctx_ptr, rp] (auto eptr) {
manager_logger.trace("send_one_file(): Hmmm. Something bad had happend: {}", eptr);
ctx_ptr->on_hint_send_failure(rp);
});
}
void manager::end_point_hints_manager::sender::send_one_file_ctx::mark_hint_as_in_progress(db::replay_position rp) {
last_attempted_rp = rp;
}
void manager::end_point_hints_manager::sender::send_one_file_ctx::on_hint_send_success(db::replay_position rp) noexcept {
}
void manager::end_point_hints_manager::sender::send_one_file_ctx::on_hint_send_failure(db::replay_position rp) noexcept {
segment_replay_failed = true;
if (!first_failed_rp || rp < *first_failed_rp) {

View File

@@ -126,6 +126,8 @@ public:
std::optional<db::replay_position> last_attempted_rp;
bool segment_replay_failed = false;
void mark_hint_as_in_progress(db::replay_position rp);
void on_hint_send_success(db::replay_position rp) noexcept;
void on_hint_send_failure(db::replay_position rp) noexcept;
};