From 475a40879294cde3d9f0c6cc8b4e2f9aa21dd0ce Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 29 Mar 2022 14:59:37 +0200 Subject: [PATCH] test: raft: randomized_nemesis_test: on timeout, abort calls instead of discarding them When a Raft API call such as `add_entry`, `set_configuration` or `modify_config` takes too long, we need to time-out. There was no way to abort these calls previously so we would do that by discarding the futures. Recently the APIs were extended with `abort_source` parameters. Use this. Also improve debuggability if the functions throw an exception type that we don't expect. Previously if they did, a cryptic assert would fail somewhere deep in the generator code, making the problem hard to debug. --- test/raft/randomized_nemesis_test.cc | 137 +++++++++++++++++---------- 1 file changed, 87 insertions(+), 50 deletions(-) diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index 9b4c1e563a..0665c7de43 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -207,6 +207,66 @@ raft::command make_command(const cmd_id_t& cmd_id, const Input& input) { template using call_result_t = std::variant; +// Wait for a future `f` to finish, but keep the result inside a `future`. +// Works for `future` as well as for `future`. +template +future wait(F f) { + // FIXME: using lambda as workaround for clang bug #50345 + auto impl = [] (F f) -> future { + struct container { F f; }; + container c = co_await f.then_wrapped([] (F f) { return container{std::move(f)}; }); + assert(c.f.available()); + co_return std::move(c.f); + }; + + return impl(std::move(f)); +} + +template F> +static futurize_t> +with_timeout(logical_timer& t, raft::logical_clock::time_point tp, F&& fun) { + using future_t = futurize_t>; + + // FIXME: using lambda as workaround for clang bug #50345 + auto impl = [] (logical_timer& t, raft::logical_clock::time_point tp, F&& fun) -> future_t { + abort_source timeout_as; + + // Using lambda here as workaround for seastar#1005 + future_t f = futurize_invoke([fun = std::move(fun)] (abort_source& as) mutable { return std::forward(fun)(as); }, timeout_as); + + auto sleep_and_abort = [] (raft::logical_clock::time_point tp, abort_source& timeout_as, logical_timer& t) -> future<> { + co_await t.sleep_until(tp, timeout_as); + if (!timeout_as.abort_requested()) { + // We resolved before `f`. Abort the operation. + timeout_as.request_abort(); + } + }(tp, timeout_as, t); + + f = co_await wait(std::move(f)); + + if (!timeout_as.abort_requested()) { + // `f` has already resolved, but abort the sleep. + timeout_as.request_abort(); + } + + // Wait on the sleep as well (it should return shortly, being aborted) so we don't discard the future. + try { + co_await std::move(sleep_and_abort); + } catch (const sleep_aborted&) { + // Expected (if `f` resolved first or we were externally aborted). + } catch (...) { + // There should be no other exceptions, but just in case... log it and discard, + // we want to propagate exceptions from `f`, not from sleep. + tlogger.error("unexpected exception from sleep_and_abort", std::current_exception()); + } + + // The future is available but cannot use `f.get()` as it doesn't handle void futures. + co_return co_await std::move(f); + }; + + return impl(t, tp, std::forward(fun)); +} + // Sends a given `input` as a command to `server`, waits until the command gets replicated // and applied on that server and returns the produced output. // @@ -225,11 +285,15 @@ future> call( raft::server& server, impure_state_machine& sm) { using output_channel_dropped = typename impure_state_machine::output_channel_dropped; - return sm.with_output_channel([&, input = std::move(input), timeout] (cmd_id_t cmd_id, future f) { - return timer.with_timeout(timeout, [&] (typename M::input_t input, future f) { + using input_t = typename M::input_t; + using output_t = typename M::output_t; + + return sm.with_output_channel([&, input = std::move(input), timeout] (cmd_id_t cmd_id, future f) { + return with_timeout(timer, timeout, std::bind_front([&] (input_t input, future f, abort_source& as) { return server.add_entry( make_command(std::move(cmd_id), std::move(input)), - raft::wait_type::applied + raft::wait_type::applied, + &as ).then_wrapped([output_f = std::move(f)] (future<> add_entry_f) mutable { if (add_entry_f.failed()) { // We need to discard `output_f`; the only expected exception is: @@ -239,8 +303,8 @@ future> call( return std::move(output_f); }); - }(std::move(input), std::move(f))); - }).then([] (typename M::output_t output) { + }, std::move(input), std::move(f))); + }).then([] (output_t output) { return make_ready_future>(std::move(output)); }).handle_exception([] (std::exception_ptr eptr) { try { @@ -253,27 +317,16 @@ future> call( return make_ready_future>(e); } catch (raft::stopped_error e) { return make_ready_future>(e); - } catch (logical_timer::timed_out e) { - (void)e.get_future().discard_result() - .handle_exception([] (std::exception_ptr eptr) { - try { - std::rethrow_exception(eptr); - } catch (const output_channel_dropped&) { - } catch (const raft::dropped_entry&) { - } catch (const raft::commit_status_unknown&) { - } catch (const raft::not_a_leader&) { - } catch (const raft::stopped_error&) { - } catch (const timed_out_error&) { - } catch (const broken_promise&) { - // FIXME: workaround for #9688 - } - }); + } catch (raft::request_aborted&) { return make_ready_future>(timed_out_error{}); } catch (seastar::timed_out_error& e) { return make_ready_future>(e); } catch (broken_promise&) { // FIXME: workaround for #9688 return make_ready_future>(raft::stopped_error{}); + } catch (...) { + tlogger.error("unexpected exception from call: {}", std::current_exception()); + assert(false); } }); } @@ -1042,9 +1095,9 @@ future reconfigure( } try { - co_await timer.with_timeout(timeout, [&server, config = std::move(config)] () { - return server.set_configuration(std::move(config)); - }()); + co_await with_timeout(timer, timeout, [&server, config = std::move(config)] (abort_source& as) { + return server.set_configuration(std::move(config), &as); + }); co_return std::monostate{}; } catch (raft::not_a_leader e) { co_return e; @@ -1059,20 +1112,11 @@ future reconfigure( co_return raft::stopped_error{}; } catch (raft::stopped_error e) { co_return e; - } catch (logical_timer::timed_out e) { - (void)e.get_future().discard_result() - .handle_exception([] (std::exception_ptr eptr) { - try { - std::rethrow_exception(eptr); - } catch (const raft::dropped_entry&) { - } catch (const raft::commit_status_unknown&) { - } catch (const raft::not_a_leader&) { - } catch (const raft::stopped_error&) { - } catch (const broken_promise&) { - // FIXME: workaround for #9688 - } - }); + } catch (raft::request_aborted&) { co_return timed_out_error{}; + } catch (...) { + tlogger.error("unexpected exception from set_configuration: {}", std::current_exception()); + assert(false); } } @@ -1088,9 +1132,9 @@ future modify_config( } try { - co_await timer.with_timeout(timeout, [&server, added_set = std::move(added_set), deleted = std::move(deleted)] () mutable { - return server.modify_config(std::move(added_set), std::move(deleted)); - }()); + co_await with_timeout(timer, timeout, [&server, added_set = std::move(added_set), deleted = std::move(deleted)] (abort_source& as) mutable { + return server.modify_config(std::move(added_set), std::move(deleted), &as); + }); co_return std::monostate{}; } catch (raft::not_a_leader e) { co_return e; @@ -1102,18 +1146,11 @@ future modify_config( co_return e; } catch (raft::stopped_error e) { co_return e; - } catch (logical_timer::timed_out e) { - (void)e.get_future().discard_result() - .handle_exception([] (std::exception_ptr eptr) { - try { - std::rethrow_exception(eptr); - } catch (const raft::dropped_entry&) { - } catch (const raft::commit_status_unknown&) { - } catch (const raft::not_a_leader&) { - } catch (const raft::stopped_error&) { - } - }); + } catch (raft::request_aborted&) { co_return timed_out_error{}; + } catch (...) { + tlogger.error("unexpected exception from modify_config: {}", std::current_exception()); + assert(false); } }