test: raft: randomized_nemesis_test: rpc: don't propagate gate_closed_exception outside

The `raft::rpc` interface functions are called by `raft::server_impl`
and the exceptions may be propagated outside the server, e.g. through
the `add_entry` API.

Translate the internal `gate_closed_exception` to an external
`raft::stopped_error`.
This commit is contained in:
Kamil Braun
2022-01-20 16:14:53 +01:00
parent 9da4ffc1c7
commit 871f0d00ce

View File

@@ -369,6 +369,15 @@ private:
size_t _add_entry_executions = 0;
size_t _modify_config_executions = 0;
template <typename F>
auto with_gate(F&& f) -> decltype(f()) {
try {
co_return co_await seastar::with_gate(_gate, std::forward<F>(f));
} catch (const gate_closed_exception&) {
co_return coroutine::make_exception(raft::stopped_error{});
}
}
public:
rpc(raft::server_id id, snapshots_t<State>& snaps, send_message_t send)
: _id(id), _snapshots(snaps), _send(std::move(send)) {
@@ -557,8 +566,7 @@ public:
// and push the reply through the promise, which will resolve `f` (see `receive`, `snapshot_reply_message`
// case).
co_return co_await with_gate(_gate,
[&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::snapshot_reply> {
co_return co_await with_gate([&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::snapshot_reply> {
// TODO configurable
static const raft::logical_clock::duration send_snapshot_timeout = 20_t;
@@ -585,8 +593,7 @@ public:
.cmd = cmd,
.reply_id = id
});
co_return co_await with_gate(_gate,
[&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::add_entry_reply> {
co_return co_await with_gate([&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::add_entry_reply> {
static const raft::logical_clock::duration send_add_entry_timeout = 20_t;
try {
@@ -611,8 +618,7 @@ public:
.del = del,
.reply_id = id
});
co_return co_await with_gate(_gate,
[&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::add_entry_reply> {
co_return co_await with_gate([&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::add_entry_reply> {
static const raft::logical_clock::duration send_modify_config_timeout = 200_t;
try {
@@ -634,8 +640,7 @@ public:
.reply_id = id
});
co_return co_await with_gate(_gate,
[&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::read_barrier_reply> {
co_return co_await with_gate([&, guard = std::move(guard), f = std::move(f)] () mutable -> future<raft::read_barrier_reply> {
// TODO configurable
static const raft::logical_clock::duration execute_read_barrier_on_leader_timeout = 20_t;