mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-05 06:23:03 +00:00
test: raft: randomized_nemesis_test: on timeout, abort calls instead of discarding them
When a Raft API call such as `add_entry`, `set_configuration` or `modify_config` takes too long, we need to time-out. There was no way to abort these calls previously so we would do that by discarding the futures. Recently the APIs were extended with `abort_source` parameters. Use this. Also improve debuggability if the functions throw an exception type that we don't expect. Previously if they did, a cryptic assert would fail somewhere deep in the generator code, making the problem hard to debug.
This commit is contained in:
@@ -207,6 +207,66 @@ raft::command make_command(const cmd_id_t& cmd_id, const Input& input) {
|
||||
template <PureStateMachine M>
|
||||
using call_result_t = std::variant<typename M::output_t, timed_out_error, raft::not_a_leader, raft::dropped_entry, raft::commit_status_unknown, raft::stopped_error>;
|
||||
|
||||
// Wait for a future `f` to finish, but keep the result inside a `future`.
|
||||
// Works for `future<void>` as well as for `future<T>`.
|
||||
template <Future F>
|
||||
future<F> wait(F f) {
|
||||
// FIXME: using lambda as workaround for clang bug #50345
|
||||
auto impl = [] (F f) -> future<F> {
|
||||
struct container { F f; };
|
||||
container c = co_await f.then_wrapped([] (F f) { return container{std::move(f)}; });
|
||||
assert(c.f.available());
|
||||
co_return std::move(c.f);
|
||||
};
|
||||
|
||||
return impl(std::move(f));
|
||||
}
|
||||
|
||||
template <std::invocable<abort_source&> F>
|
||||
static futurize_t<std::invoke_result_t<F, abort_source&>>
|
||||
with_timeout(logical_timer& t, raft::logical_clock::time_point tp, F&& fun) {
|
||||
using future_t = futurize_t<std::invoke_result_t<F, abort_source&>>;
|
||||
|
||||
// FIXME: using lambda as workaround for clang bug #50345
|
||||
auto impl = [] (logical_timer& t, raft::logical_clock::time_point tp, F&& fun) -> future_t {
|
||||
abort_source timeout_as;
|
||||
|
||||
// Using lambda here as workaround for seastar#1005
|
||||
future_t f = futurize_invoke([fun = std::move(fun)] (abort_source& as) mutable { return std::forward<F>(fun)(as); }, timeout_as);
|
||||
|
||||
auto sleep_and_abort = [] (raft::logical_clock::time_point tp, abort_source& timeout_as, logical_timer& t) -> future<> {
|
||||
co_await t.sleep_until(tp, timeout_as);
|
||||
if (!timeout_as.abort_requested()) {
|
||||
// We resolved before `f`. Abort the operation.
|
||||
timeout_as.request_abort();
|
||||
}
|
||||
}(tp, timeout_as, t);
|
||||
|
||||
f = co_await wait(std::move(f));
|
||||
|
||||
if (!timeout_as.abort_requested()) {
|
||||
// `f` has already resolved, but abort the sleep.
|
||||
timeout_as.request_abort();
|
||||
}
|
||||
|
||||
// Wait on the sleep as well (it should return shortly, being aborted) so we don't discard the future.
|
||||
try {
|
||||
co_await std::move(sleep_and_abort);
|
||||
} catch (const sleep_aborted&) {
|
||||
// Expected (if `f` resolved first or we were externally aborted).
|
||||
} catch (...) {
|
||||
// There should be no other exceptions, but just in case... log it and discard,
|
||||
// we want to propagate exceptions from `f`, not from sleep.
|
||||
tlogger.error("unexpected exception from sleep_and_abort", std::current_exception());
|
||||
}
|
||||
|
||||
// The future is available but cannot use `f.get()` as it doesn't handle void futures.
|
||||
co_return co_await std::move(f);
|
||||
};
|
||||
|
||||
return impl(t, tp, std::forward<F>(fun));
|
||||
}
|
||||
|
||||
// Sends a given `input` as a command to `server`, waits until the command gets replicated
|
||||
// and applied on that server and returns the produced output.
|
||||
//
|
||||
@@ -225,11 +285,15 @@ future<call_result_t<M>> call(
|
||||
raft::server& server,
|
||||
impure_state_machine<M>& sm) {
|
||||
using output_channel_dropped = typename impure_state_machine<M>::output_channel_dropped;
|
||||
return sm.with_output_channel([&, input = std::move(input), timeout] (cmd_id_t cmd_id, future<typename M::output_t> f) {
|
||||
return timer.with_timeout(timeout, [&] (typename M::input_t input, future<typename M::output_t> f) {
|
||||
using input_t = typename M::input_t;
|
||||
using output_t = typename M::output_t;
|
||||
|
||||
return sm.with_output_channel([&, input = std::move(input), timeout] (cmd_id_t cmd_id, future<output_t> f) {
|
||||
return with_timeout(timer, timeout, std::bind_front([&] (input_t input, future<output_t> f, abort_source& as) {
|
||||
return server.add_entry(
|
||||
make_command(std::move(cmd_id), std::move(input)),
|
||||
raft::wait_type::applied
|
||||
raft::wait_type::applied,
|
||||
&as
|
||||
).then_wrapped([output_f = std::move(f)] (future<> add_entry_f) mutable {
|
||||
if (add_entry_f.failed()) {
|
||||
// We need to discard `output_f`; the only expected exception is:
|
||||
@@ -239,8 +303,8 @@ future<call_result_t<M>> call(
|
||||
|
||||
return std::move(output_f);
|
||||
});
|
||||
}(std::move(input), std::move(f)));
|
||||
}).then([] (typename M::output_t output) {
|
||||
}, std::move(input), std::move(f)));
|
||||
}).then([] (output_t output) {
|
||||
return make_ready_future<call_result_t<M>>(std::move(output));
|
||||
}).handle_exception([] (std::exception_ptr eptr) {
|
||||
try {
|
||||
@@ -253,27 +317,16 @@ future<call_result_t<M>> call(
|
||||
return make_ready_future<call_result_t<M>>(e);
|
||||
} catch (raft::stopped_error e) {
|
||||
return make_ready_future<call_result_t<M>>(e);
|
||||
} catch (logical_timer::timed_out<typename M::output_t> e) {
|
||||
(void)e.get_future().discard_result()
|
||||
.handle_exception([] (std::exception_ptr eptr) {
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
} catch (const output_channel_dropped&) {
|
||||
} catch (const raft::dropped_entry&) {
|
||||
} catch (const raft::commit_status_unknown&) {
|
||||
} catch (const raft::not_a_leader&) {
|
||||
} catch (const raft::stopped_error&) {
|
||||
} catch (const timed_out_error&) {
|
||||
} catch (const broken_promise&) {
|
||||
// FIXME: workaround for #9688
|
||||
}
|
||||
});
|
||||
} catch (raft::request_aborted&) {
|
||||
return make_ready_future<call_result_t<M>>(timed_out_error{});
|
||||
} catch (seastar::timed_out_error& e) {
|
||||
return make_ready_future<call_result_t<M>>(e);
|
||||
} catch (broken_promise&) {
|
||||
// FIXME: workaround for #9688
|
||||
return make_ready_future<call_result_t<M>>(raft::stopped_error{});
|
||||
} catch (...) {
|
||||
tlogger.error("unexpected exception from call: {}", std::current_exception());
|
||||
assert(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1042,9 +1095,9 @@ future<reconfigure_result_t> reconfigure(
|
||||
}
|
||||
|
||||
try {
|
||||
co_await timer.with_timeout(timeout, [&server, config = std::move(config)] () {
|
||||
return server.set_configuration(std::move(config));
|
||||
}());
|
||||
co_await with_timeout(timer, timeout, [&server, config = std::move(config)] (abort_source& as) {
|
||||
return server.set_configuration(std::move(config), &as);
|
||||
});
|
||||
co_return std::monostate{};
|
||||
} catch (raft::not_a_leader e) {
|
||||
co_return e;
|
||||
@@ -1059,20 +1112,11 @@ future<reconfigure_result_t> reconfigure(
|
||||
co_return raft::stopped_error{};
|
||||
} catch (raft::stopped_error e) {
|
||||
co_return e;
|
||||
} catch (logical_timer::timed_out<void> e) {
|
||||
(void)e.get_future().discard_result()
|
||||
.handle_exception([] (std::exception_ptr eptr) {
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
} catch (const raft::dropped_entry&) {
|
||||
} catch (const raft::commit_status_unknown&) {
|
||||
} catch (const raft::not_a_leader&) {
|
||||
} catch (const raft::stopped_error&) {
|
||||
} catch (const broken_promise&) {
|
||||
// FIXME: workaround for #9688
|
||||
}
|
||||
});
|
||||
} catch (raft::request_aborted&) {
|
||||
co_return timed_out_error{};
|
||||
} catch (...) {
|
||||
tlogger.error("unexpected exception from set_configuration: {}", std::current_exception());
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1088,9 +1132,9 @@ future<reconfigure_result_t> modify_config(
|
||||
}
|
||||
|
||||
try {
|
||||
co_await timer.with_timeout(timeout, [&server, added_set = std::move(added_set), deleted = std::move(deleted)] () mutable {
|
||||
return server.modify_config(std::move(added_set), std::move(deleted));
|
||||
}());
|
||||
co_await with_timeout(timer, timeout, [&server, added_set = std::move(added_set), deleted = std::move(deleted)] (abort_source& as) mutable {
|
||||
return server.modify_config(std::move(added_set), std::move(deleted), &as);
|
||||
});
|
||||
co_return std::monostate{};
|
||||
} catch (raft::not_a_leader e) {
|
||||
co_return e;
|
||||
@@ -1102,18 +1146,11 @@ future<reconfigure_result_t> modify_config(
|
||||
co_return e;
|
||||
} catch (raft::stopped_error e) {
|
||||
co_return e;
|
||||
} catch (logical_timer::timed_out<void> e) {
|
||||
(void)e.get_future().discard_result()
|
||||
.handle_exception([] (std::exception_ptr eptr) {
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
} catch (const raft::dropped_entry&) {
|
||||
} catch (const raft::commit_status_unknown&) {
|
||||
} catch (const raft::not_a_leader&) {
|
||||
} catch (const raft::stopped_error&) {
|
||||
}
|
||||
});
|
||||
} catch (raft::request_aborted&) {
|
||||
co_return timed_out_error{};
|
||||
} catch (...) {
|
||||
tlogger.error("unexpected exception from modify_config: {}", std::current_exception());
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user