mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
raft: Add descriptions for requested abort errors
Fixes: scylladb/scylladb#18902 This PR only improves error messages, no need to backport it.
This commit is contained in:
@@ -355,6 +355,15 @@ public:
|
||||
bool is_candidate() const {
|
||||
return std::holds_alternative<candidate>(_state);
|
||||
}
|
||||
std::string_view current_state() const {
|
||||
static constexpr std::string_view leader_state = "Leader";
|
||||
static constexpr std::string_view follower_state = "Follower";
|
||||
static constexpr std::string_view candidate_state = "Candidate";
|
||||
if (is_leader()) {
|
||||
return leader_state;
|
||||
}
|
||||
return is_follower() ? follower_state : candidate_state;
|
||||
}
|
||||
bool is_prevote_candidate() const {
|
||||
return is_candidate() && std::get<candidate>(_state).is_prevote;
|
||||
}
|
||||
|
||||
@@ -324,7 +324,7 @@ struct no_other_voting_member : public error {
|
||||
};
|
||||
|
||||
struct request_aborted : public error {
|
||||
request_aborted() : error("Request is aborted by a caller") {}
|
||||
request_aborted(const std::string& error_msg) : error(error_msg) {}
|
||||
};
|
||||
|
||||
inline bool is_uncertainty(const std::exception& e) {
|
||||
|
||||
@@ -431,7 +431,7 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
|
||||
try {
|
||||
co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future());
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Aborted while waiting for next tick on server: {}, latest applied entry: {}", _id, _applied_idx));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -449,7 +449,7 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
||||
try {
|
||||
co_await (as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future());
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Aborted while waiting for leader on server: {}, latest applied entry: {}", _id, _applied_idx));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -461,7 +461,8 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
|
||||
try {
|
||||
return as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future();
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format(
|
||||
"Aborted while waiting for state change on server: {}, latest applied entry: {}, current state: {}", _id, _applied_idx, _fsm->current_state()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,9 +500,19 @@ future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
|
||||
as->check();
|
||||
}
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(
|
||||
format("Aborted in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on server: {}, latest applied entry: {}",
|
||||
awaited_idx,
|
||||
_snapshot_desc_idx,
|
||||
_id,
|
||||
_applied_idx));
|
||||
} catch (seastar::broken_condition_variable&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Condition variable is broken in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on "
|
||||
"server: {}, latest applied entry: {}",
|
||||
awaited_idx,
|
||||
_snapshot_desc_idx,
|
||||
_id,
|
||||
_applied_idx));
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
@@ -578,7 +589,7 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
|
||||
check_not_aborted();
|
||||
|
||||
if (as && as->abort_requested()) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Abort requested before waiting for entry with idx: {}, term: {}", eid.idx, eid.term));
|
||||
}
|
||||
|
||||
auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
|
||||
@@ -626,8 +637,9 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
|
||||
}
|
||||
SCYLLA_ASSERT(inserted);
|
||||
if (as) {
|
||||
it->second.abort = as->subscribe([it = it, &container] () noexcept {
|
||||
it->second.done.set_exception(request_aborted());
|
||||
it->second.abort = as->subscribe([it = it, &container] noexcept {
|
||||
it->second.done.set_exception(
|
||||
request_aborted(format("Abort requested while waiting for entry with idx: {}, term: {}", it->first, it->second.term)));
|
||||
container.erase(it);
|
||||
});
|
||||
SCYLLA_ASSERT(it->second.abort);
|
||||
@@ -645,7 +657,11 @@ future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_so
|
||||
try {
|
||||
memory_permit = co_await _fsm->wait_for_memory_permit(as, log::memory_usage_of(cmd, _config.max_command_size));
|
||||
} catch (semaphore_aborted&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(
|
||||
format("Semaphore aborted while waiting for memory availability for adding entry on leader in term: {}, on server: {}, current term: {}",
|
||||
t,
|
||||
_id,
|
||||
_fsm->get_current_term()));
|
||||
}
|
||||
if (t == _fsm->get_current_term()) {
|
||||
break;
|
||||
@@ -692,7 +708,9 @@ future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, Async
|
||||
|
||||
while (true) {
|
||||
if (as && as->abort_requested()) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Request aborted while performing action on leader, current leader: {}, previous leader: {}",
|
||||
leader ? leader.to_sstring() : "unknown",
|
||||
prev_leader ? prev_leader.to_sstring() : "unknown"));
|
||||
}
|
||||
check_not_aborted();
|
||||
if (leader == server_id{}) {
|
||||
@@ -1431,7 +1449,7 @@ term_t server_impl::get_current_term() const {
|
||||
|
||||
future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
|
||||
if (as && as->abort_requested()) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Aborted before waiting for applying entry: {}, last applied entry: {}", idx, _applied_idx));
|
||||
}
|
||||
|
||||
check_not_aborted();
|
||||
@@ -1441,8 +1459,9 @@ future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
|
||||
// This will be signalled when read_idx is applied
|
||||
auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}});
|
||||
if (as) {
|
||||
it->second.abort = as->subscribe([this, it] () noexcept {
|
||||
it->second.promise.set_exception(request_aborted());
|
||||
it->second.abort = as->subscribe([this, it] noexcept {
|
||||
it->second.promise.set_exception(
|
||||
request_aborted(format("Aborted while waiting to apply entry: {}, last applied entry: {}", it->first, _applied_idx)));
|
||||
_awaited_indexes.erase(it);
|
||||
});
|
||||
SCYLLA_ASSERT(it->second.abort);
|
||||
@@ -1469,13 +1488,15 @@ future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, sea
|
||||
logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}",
|
||||
_id, rid->first, rid->second);
|
||||
if (as && as->abort_requested()) {
|
||||
return make_exception_future<read_barrier_reply>(request_aborted());
|
||||
return make_exception_future<read_barrier_reply>(
|
||||
request_aborted(format("Abort requested before waiting for read barrier from {}, read id is {} for commit idx {}", from, rid->first, rid->second)));
|
||||
}
|
||||
_reads.push_back({rid->first, rid->second, {}, {}});
|
||||
auto read = std::prev(_reads.end());
|
||||
if (as) {
|
||||
read->abort = as->subscribe([this, read] () noexcept {
|
||||
read->promise.set_exception(request_aborted());
|
||||
read->abort = as->subscribe([this, read, from] noexcept {
|
||||
read->promise.set_exception(
|
||||
request_aborted(format("Abort requested while waiting for read barrier from {}, read id is {} for commit idx {}", from, read->id, read->idx)));
|
||||
_reads.erase(read);
|
||||
});
|
||||
SCYLLA_ASSERT(read->abort);
|
||||
@@ -1678,12 +1699,14 @@ future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_
|
||||
|
||||
auto f = _non_joint_conf_commit_promise.emplace().promise.get_future();
|
||||
if (as) {
|
||||
_non_joint_conf_commit_promise->abort = as->subscribe([this] () noexcept {
|
||||
_non_joint_conf_commit_promise->abort = as->subscribe([this, idx = e.idx, term = e.term] noexcept {
|
||||
// If we're inside this callback, the subscription wasn't destroyed yet.
|
||||
// The subscription is destroyed when the field is reset, so if we're here, the field must be engaged.
|
||||
SCYLLA_ASSERT(_non_joint_conf_commit_promise);
|
||||
// Whoever resolves the promise must reset the field. Thus, if we're here, the promise is not resolved.
|
||||
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_exception(request_aborted{});
|
||||
std::exchange(_non_joint_conf_commit_promise, std::nullopt)
|
||||
->promise.set_exception(request_aborted(
|
||||
format("Aborted while setting configuration (at index: {}, term: {}, current config: {})", idx, term, _fsm->get_configuration())));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -348,7 +348,8 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
|
||||
|
||||
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
|
||||
} catch (const abort_requested_exception&) {
|
||||
throw raft::request_aborted();
|
||||
throw raft::request_aborted(format(
|
||||
"Abort requested while transferring snapshot from ID/IP: {}/{}, snapshot descriptor id: {}, snapshot index: {}", from_id, from_ip, snp.id, snp.idx));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user