raft: Add descriptions for requested abort errors

Fixes: scylladb/scylladb#18902

This PR only improves error messages, no need to backport it.
This commit is contained in:
Abhi
2024-08-26 14:21:11 +05:30
parent 59aedb38d0
commit a616f1063a
4 changed files with 53 additions and 20 deletions

View File

@@ -355,6 +355,15 @@ public:
bool is_candidate() const {
return std::holds_alternative<candidate>(_state);
}
std::string_view current_state() const {
static constexpr std::string_view leader_state = "Leader";
static constexpr std::string_view follower_state = "Follower";
static constexpr std::string_view candidate_state = "Candidate";
if (is_leader()) {
return leader_state;
}
return is_follower() ? follower_state : candidate_state;
}
bool is_prevote_candidate() const {
return is_candidate() && std::get<candidate>(_state).is_prevote;
}

View File

@@ -324,7 +324,7 @@ struct no_other_voting_member : public error {
};
struct request_aborted : public error {
request_aborted() : error("Request is aborted by a caller") {}
request_aborted(const std::string& error_msg) : error(error_msg) {}
};
inline bool is_uncertainty(const std::exception& e) {

View File

@@ -431,7 +431,7 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
try {
co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future());
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(format("Aborted while waiting for next tick on server: {}, latest applied entry: {}", _id, _applied_idx));
}
}
@@ -449,7 +449,7 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
try {
co_await (as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future());
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(format("Aborted while waiting for leader on server: {}, latest applied entry: {}", _id, _applied_idx));
}
}
@@ -461,7 +461,8 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
try {
return as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future();
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(format(
"Aborted while waiting for state change on server: {}, latest applied entry: {}, current state: {}", _id, _applied_idx, _fsm->current_state()));
}
}
@@ -499,9 +500,19 @@ future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
as->check();
}
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(
format("Aborted in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on server: {}, latest applied entry: {}",
awaited_idx,
_snapshot_desc_idx,
_id,
_applied_idx));
} catch (seastar::broken_condition_variable&) {
throw request_aborted();
throw request_aborted(format("Condition variable is broken in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on "
"server: {}, latest applied entry: {}",
awaited_idx,
_snapshot_desc_idx,
_id,
_applied_idx));
}
logger.debug(
@@ -578,7 +589,7 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
check_not_aborted();
if (as && as->abort_requested()) {
throw request_aborted();
throw request_aborted(format("Abort requested before waiting for entry with idx: {}, term: {}", eid.idx, eid.term));
}
auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
@@ -626,8 +637,9 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
}
SCYLLA_ASSERT(inserted);
if (as) {
it->second.abort = as->subscribe([it = it, &container] () noexcept {
it->second.done.set_exception(request_aborted());
it->second.abort = as->subscribe([it = it, &container] noexcept {
it->second.done.set_exception(
request_aborted(format("Abort requested while waiting for entry with idx: {}, term: {}", it->first, it->second.term)));
container.erase(it);
});
SCYLLA_ASSERT(it->second.abort);
@@ -645,7 +657,11 @@ future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_so
try {
memory_permit = co_await _fsm->wait_for_memory_permit(as, log::memory_usage_of(cmd, _config.max_command_size));
} catch (semaphore_aborted&) {
throw request_aborted();
throw request_aborted(
format("Semaphore aborted while waiting for memory availability for adding entry on leader in term: {}, on server: {}, current term: {}",
t,
_id,
_fsm->get_current_term()));
}
if (t == _fsm->get_current_term()) {
break;
@@ -692,7 +708,9 @@ future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, Async
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
throw request_aborted(format("Request aborted while performing action on leader, current leader: {}, previous leader: {}",
leader ? leader.to_sstring() : "unknown",
prev_leader ? prev_leader.to_sstring() : "unknown"));
}
check_not_aborted();
if (leader == server_id{}) {
@@ -1431,7 +1449,7 @@ term_t server_impl::get_current_term() const {
future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
if (as && as->abort_requested()) {
throw request_aborted();
throw request_aborted(format("Aborted before waiting for applying entry: {}, last applied entry: {}", idx, _applied_idx));
}
check_not_aborted();
@@ -1441,8 +1459,9 @@ future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
// This will be signalled when read_idx is applied
auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}});
if (as) {
it->second.abort = as->subscribe([this, it] () noexcept {
it->second.promise.set_exception(request_aborted());
it->second.abort = as->subscribe([this, it] noexcept {
it->second.promise.set_exception(
request_aborted(format("Aborted while waiting to apply entry: {}, last applied entry: {}", it->first, _applied_idx)));
_awaited_indexes.erase(it);
});
SCYLLA_ASSERT(it->second.abort);
@@ -1469,13 +1488,15 @@ future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, sea
logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}",
_id, rid->first, rid->second);
if (as && as->abort_requested()) {
return make_exception_future<read_barrier_reply>(request_aborted());
return make_exception_future<read_barrier_reply>(
request_aborted(format("Abort requested before waiting for read barrier from {}, read id is {} for commit idx {}", from, rid->first, rid->second)));
}
_reads.push_back({rid->first, rid->second, {}, {}});
auto read = std::prev(_reads.end());
if (as) {
read->abort = as->subscribe([this, read] () noexcept {
read->promise.set_exception(request_aborted());
read->abort = as->subscribe([this, read, from] noexcept {
read->promise.set_exception(
request_aborted(format("Abort requested while waiting for read barrier from {}, read id is {} for commit idx {}", from, read->id, read->idx)));
_reads.erase(read);
});
SCYLLA_ASSERT(read->abort);
@@ -1678,12 +1699,14 @@ future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_
auto f = _non_joint_conf_commit_promise.emplace().promise.get_future();
if (as) {
_non_joint_conf_commit_promise->abort = as->subscribe([this] () noexcept {
_non_joint_conf_commit_promise->abort = as->subscribe([this, idx = e.idx, term = e.term] noexcept {
// If we're inside this callback, the subscription wasn't destroyed yet.
// The subscription is destroyed when the field is reset, so if we're here, the field must be engaged.
SCYLLA_ASSERT(_non_joint_conf_commit_promise);
// Whoever resolves the promise must reset the field. Thus, if we're here, the promise is not resolved.
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_exception(request_aborted{});
std::exchange(_non_joint_conf_commit_promise, std::nullopt)
->promise.set_exception(request_aborted(
format("Aborted while setting configuration (at index: {}, term: {}, current config: {})", idx, term, _fsm->get_configuration())));
});
}

View File

@@ -348,7 +348,8 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
} catch (const abort_requested_exception&) {
throw raft::request_aborted();
throw raft::request_aborted(format(
"Abort requested while transferring snapshot from ID/IP: {}/{}, snapshot descriptor id: {}, snapshot index: {}", from_id, from_ip, snp.id, snp.idx));
}
}