diff --git a/raft/server.cc b/raft/server.cc index 5d1a1a65d1..65273423bf 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -460,7 +460,7 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) { } try { - return as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future(); + co_await (as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future()); } catch (abort_requested_exception&) { throw request_aborted(fmt::format( "Aborted while waiting for state change on server: {}, latest applied entry: {}, current state: {}", _id, _applied_idx, _fsm->current_state())); diff --git a/raft/server.hh b/raft/server.hh index a609b10b32..cd6865b336 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -252,6 +252,10 @@ public: // // The caller may pass a pointer to an abort_source to make the function abortable. // It it passes nullptr, the function is unabortable. + // + // Exceptions: + // raft::request_aborted + // Thrown if abort is requested before the operation finishes. virtual future<> wait_for_state_change(seastar::abort_source* as) = 0; // Manually trigger snapshot creation and log truncation. diff --git a/test/raft/raft_server_test.cc b/test/raft/raft_server_test.cc index 147997b083..d36ddce5da 100644 --- a/test/raft/raft_server_test.cc +++ b/test/raft/raft_server_test.cc @@ -1,4 +1,5 @@ #include +#include "raft/raft.hh" #include "replication.hh" #include "utils/error_injection.hh" #include @@ -68,3 +69,44 @@ SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) { cluster.read(read_value{0, 1}).get(); #endif } + +// A simple test verifying the most basic properties of `wait_for_state_change`: +// * Triggering the passed abort_source will abort the operation. +// The future will be resolved. +// * The future will contain an exception, and its type will be `raft::request_aborted`. +// Reproduces SCYLLADB-665. +SEASTAR_THREAD_TEST_CASE(test_aborting_wait_for_state_change) { + const size_t command_size = sizeof(size_t); + raft_cluster cluster( + test_case { + .nodes = 1, + .config = std::vector({ + raft::server::configuration { + .snapshot_threshold_log_size = 0, + .snapshot_trailing_size = 0, + .max_log_size = command_size, + .max_command_size = command_size + } + }) + }, + ::apply_changes, + 0, + 0, + 0, false, tick_delay, rpc_config{}); + cluster.start_all().get(); + auto stop = defer([&cluster] { cluster.stop_all().get(); }); + + auto& server = cluster.get_server(0); + server.wait_election_done().get(); + + abort_source as; + // Note that this future cannot resolve immediately. + // In particular, the leader election we awaited above cannot + // influence it since the promises corresponding to + // waiting for a leader and state change are resolved + // within the same call, one after the other + // (cf. server_impl::process_fsm_output). + future<> fut_default_ex = server.wait_for_state_change(&as); + as.request_abort(); + BOOST_CHECK_THROW((void) fut_default_ex.get(), raft::request_aborted); +}