|
|
|
|
@@ -431,7 +431,7 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
|
|
|
|
|
try {
|
|
|
|
|
co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future());
|
|
|
|
|
} catch (abort_requested_exception&) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(format("Aborted while waiting for next tick on server: {}, latest applied entry: {}", _id, _applied_idx));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -449,7 +449,7 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
|
|
|
|
try {
|
|
|
|
|
co_await (as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future());
|
|
|
|
|
} catch (abort_requested_exception&) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(format("Aborted while waiting for leader on server: {}, latest applied entry: {}", _id, _applied_idx));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -461,7 +461,8 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
|
|
|
|
|
try {
|
|
|
|
|
return as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future();
|
|
|
|
|
} catch (abort_requested_exception&) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(format(
|
|
|
|
|
"Aborted while waiting for state change on server: {}, latest applied entry: {}, current state: {}", _id, _applied_idx, _fsm->current_state()));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -499,9 +500,19 @@ future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
|
|
|
|
|
as->check();
|
|
|
|
|
}
|
|
|
|
|
} catch (abort_requested_exception&) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(
|
|
|
|
|
format("Aborted in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on server: {}, latest applied entry: {}",
|
|
|
|
|
awaited_idx,
|
|
|
|
|
_snapshot_desc_idx,
|
|
|
|
|
_id,
|
|
|
|
|
_applied_idx));
|
|
|
|
|
} catch (seastar::broken_condition_variable&) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(format("Condition variable is broken in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on "
|
|
|
|
|
"server: {}, latest applied entry: {}",
|
|
|
|
|
awaited_idx,
|
|
|
|
|
_snapshot_desc_idx,
|
|
|
|
|
_id,
|
|
|
|
|
_applied_idx));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.debug(
|
|
|
|
|
@@ -578,7 +589,7 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
|
|
|
|
|
check_not_aborted();
|
|
|
|
|
|
|
|
|
|
if (as && as->abort_requested()) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(format("Abort requested before waiting for entry with idx: {}, term: {}", eid.idx, eid.term));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
|
|
|
|
|
@@ -626,8 +637,9 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
|
|
|
|
|
}
|
|
|
|
|
SCYLLA_ASSERT(inserted);
|
|
|
|
|
if (as) {
|
|
|
|
|
it->second.abort = as->subscribe([it = it, &container] () noexcept {
|
|
|
|
|
it->second.done.set_exception(request_aborted());
|
|
|
|
|
it->second.abort = as->subscribe([it = it, &container] noexcept {
|
|
|
|
|
it->second.done.set_exception(
|
|
|
|
|
request_aborted(format("Abort requested while waiting for entry with idx: {}, term: {}", it->first, it->second.term)));
|
|
|
|
|
container.erase(it);
|
|
|
|
|
});
|
|
|
|
|
SCYLLA_ASSERT(it->second.abort);
|
|
|
|
|
@@ -645,7 +657,11 @@ future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_so
|
|
|
|
|
try {
|
|
|
|
|
memory_permit = co_await _fsm->wait_for_memory_permit(as, log::memory_usage_of(cmd, _config.max_command_size));
|
|
|
|
|
} catch (semaphore_aborted&) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(
|
|
|
|
|
format("Semaphore aborted while waiting for memory availability for adding entry on leader in term: {}, on server: {}, current term: {}",
|
|
|
|
|
t,
|
|
|
|
|
_id,
|
|
|
|
|
_fsm->get_current_term()));
|
|
|
|
|
}
|
|
|
|
|
if (t == _fsm->get_current_term()) {
|
|
|
|
|
break;
|
|
|
|
|
@@ -692,7 +708,9 @@ future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, Async
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
if (as && as->abort_requested()) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(format("Request aborted while performing action on leader, current leader: {}, previous leader: {}",
|
|
|
|
|
leader ? leader.to_sstring() : "unknown",
|
|
|
|
|
prev_leader ? prev_leader.to_sstring() : "unknown"));
|
|
|
|
|
}
|
|
|
|
|
check_not_aborted();
|
|
|
|
|
if (leader == server_id{}) {
|
|
|
|
|
@@ -1431,7 +1449,7 @@ term_t server_impl::get_current_term() const {
|
|
|
|
|
|
|
|
|
|
future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
|
|
|
|
|
if (as && as->abort_requested()) {
|
|
|
|
|
throw request_aborted();
|
|
|
|
|
throw request_aborted(format("Aborted before waiting for applying entry: {}, last applied entry: {}", idx, _applied_idx));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
check_not_aborted();
|
|
|
|
|
@@ -1441,8 +1459,9 @@ future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
|
|
|
|
|
// This will be signalled when read_idx is applied
|
|
|
|
|
auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}});
|
|
|
|
|
if (as) {
|
|
|
|
|
it->second.abort = as->subscribe([this, it] () noexcept {
|
|
|
|
|
it->second.promise.set_exception(request_aborted());
|
|
|
|
|
it->second.abort = as->subscribe([this, it] noexcept {
|
|
|
|
|
it->second.promise.set_exception(
|
|
|
|
|
request_aborted(format("Aborted while waiting to apply entry: {}, last applied entry: {}", it->first, _applied_idx)));
|
|
|
|
|
_awaited_indexes.erase(it);
|
|
|
|
|
});
|
|
|
|
|
SCYLLA_ASSERT(it->second.abort);
|
|
|
|
|
@@ -1469,13 +1488,15 @@ future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, sea
|
|
|
|
|
logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}",
|
|
|
|
|
_id, rid->first, rid->second);
|
|
|
|
|
if (as && as->abort_requested()) {
|
|
|
|
|
return make_exception_future<read_barrier_reply>(request_aborted());
|
|
|
|
|
return make_exception_future<read_barrier_reply>(
|
|
|
|
|
request_aborted(format("Abort requested before waiting for read barrier from {}, read id is {} for commit idx {}", from, rid->first, rid->second)));
|
|
|
|
|
}
|
|
|
|
|
_reads.push_back({rid->first, rid->second, {}, {}});
|
|
|
|
|
auto read = std::prev(_reads.end());
|
|
|
|
|
if (as) {
|
|
|
|
|
read->abort = as->subscribe([this, read] () noexcept {
|
|
|
|
|
read->promise.set_exception(request_aborted());
|
|
|
|
|
read->abort = as->subscribe([this, read, from] noexcept {
|
|
|
|
|
read->promise.set_exception(
|
|
|
|
|
request_aborted(format("Abort requested while waiting for read barrier from {}, read id is {} for commit idx {}", from, read->id, read->idx)));
|
|
|
|
|
_reads.erase(read);
|
|
|
|
|
});
|
|
|
|
|
SCYLLA_ASSERT(read->abort);
|
|
|
|
|
@@ -1678,12 +1699,14 @@ future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_
|
|
|
|
|
|
|
|
|
|
auto f = _non_joint_conf_commit_promise.emplace().promise.get_future();
|
|
|
|
|
if (as) {
|
|
|
|
|
_non_joint_conf_commit_promise->abort = as->subscribe([this] () noexcept {
|
|
|
|
|
_non_joint_conf_commit_promise->abort = as->subscribe([this, idx = e.idx, term = e.term] noexcept {
|
|
|
|
|
// If we're inside this callback, the subscription wasn't destroyed yet.
|
|
|
|
|
// The subscription is destroyed when the field is reset, so if we're here, the field must be engaged.
|
|
|
|
|
SCYLLA_ASSERT(_non_joint_conf_commit_promise);
|
|
|
|
|
// Whoever resolves the promise must reset the field. Thus, if we're here, the promise is not resolved.
|
|
|
|
|
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_exception(request_aborted{});
|
|
|
|
|
std::exchange(_non_joint_conf_commit_promise, std::nullopt)
|
|
|
|
|
->promise.set_exception(request_aborted(
|
|
|
|
|
format("Aborted while setting configuration (at index: {}, term: {}, current config: {})", idx, term, _fsm->get_configuration())));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|