diff --git a/raft/server.cc b/raft/server.cc index e3c34f68d6..8d1f882de5 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -436,6 +436,8 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) { } future<> server_impl::wait_for_leader(seastar::abort_source* as) { + check_not_aborted(); + if (_fsm->current_leader()) { co_return; } @@ -454,6 +456,8 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) { } future<> server_impl::wait_for_state_change(seastar::abort_source* as) { + check_not_aborted(); + if (!_state_change_promise) { _state_change_promise.emplace(); } @@ -748,6 +752,8 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_ } _stats.add_command++; + check_not_aborted(); + logger.trace("[{}] an entry is submitted", id()); if (!_config.enable_forwarding) { if (const auto leader = _fsm->current_leader(); leader != _id) { @@ -858,6 +864,8 @@ future server_impl::execute_modify_config(server_id from, } future<> server_impl::modify_config(std::vector add, std::vector del, seastar::abort_source* as) { + check_not_aborted(); + utils::get_local_injector().inject("raft/throw_commit_status_unknown_in_modify_config", [] { throw raft::commit_status_unknown(); }); diff --git a/raft/server.hh b/raft/server.hh index e956eb460a..0d46fd59c9 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -254,6 +254,8 @@ public: // It it passes nullptr, the function is unabortable. // // Exceptions: + // raft::stopped_error + // Thrown if abort() was called on the server instance. // raft::request_aborted // Thrown if abort is requested before the operation finishes. virtual future<> wait_for_state_change(seastar::abort_source* as) = 0; @@ -268,6 +270,8 @@ public: // It it passes nullptr, the function is unabortable. // // Exceptions: + // raft::stopped_error + // Thrown if abort() was called on the server instance. // raft::request_aborted // Thrown if abort is requested before the operation finishes. virtual future<> wait_for_leader(seastar::abort_source* as) = 0; diff --git a/test/raft/raft_server_test.cc b/test/raft/raft_server_test.cc index 4140d64c2b..094f61e012 100644 --- a/test/raft/raft_server_test.cc +++ b/test/raft/raft_server_test.cc @@ -11,6 +11,18 @@ const auto tick_delay = 100ms; #endif +// The word "default" means "usually used by the tests here". +template +static raft_cluster get_default_cluster(test_case test_config) { + return raft_cluster{ + std::move(test_config), + ::apply_changes, + 0, + 0, + 0, false, tick_delay, rpc_config{} + }; +} + SEASTAR_THREAD_TEST_CASE(test_check_abort_on_client_api) { raft_cluster cluster( test_case { .nodes = 1 }, @@ -38,22 +50,18 @@ SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) { std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n"; #else const size_t command_size = sizeof(size_t); - raft_cluster cluster( - test_case { - .nodes = 1, - .config = std::vector({ - raft::server::configuration { - .snapshot_threshold_log_size = 0, - .snapshot_trailing_size = 0, - .max_log_size = command_size, - .max_command_size = command_size - } - }) - }, - ::apply_changes, - 0, - 0, - 0, false, tick_delay, rpc_config{}); + test_case test_config { + .nodes = 1, + .config = std::vector({ + raft::server::configuration { + .snapshot_threshold_log_size = 0, + .snapshot_trailing_size = 0, + .max_log_size = command_size, + .max_command_size = command_size + } + }) + }; + auto cluster = get_default_cluster(std::move(test_config)); cluster.start_all().get(); auto stop = defer([&cluster] { cluster.stop_all().get(); }); @@ -76,23 +84,7 @@ SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) { // * The future will contain an exception, and its type will be `raft::request_aborted`. // Reproduces SCYLLADB-665. SEASTAR_THREAD_TEST_CASE(test_aborting_wait_for_state_change) { - const size_t command_size = sizeof(size_t); - raft_cluster cluster( - test_case { - .nodes = 1, - .config = std::vector({ - raft::server::configuration { - .snapshot_threshold_log_size = 0, - .snapshot_trailing_size = 0, - .max_log_size = command_size, - .max_command_size = command_size - } - }) - }, - ::apply_changes, - 0, - 0, - 0, false, tick_delay, rpc_config{}); + auto cluster = get_default_cluster(test_case{ .nodes = 1 }); cluster.start_all().get(); auto stop = defer([&cluster] { cluster.stop_all().get(); }); @@ -110,3 +102,104 @@ SEASTAR_THREAD_TEST_CASE(test_aborting_wait_for_state_change) { as.request_abort(); BOOST_CHECK_THROW((void) fut_default_ex.get(), raft::request_aborted); } + +static void test_func_on_aborted_server_aux( + std::function(raft::server&, abort_source*)> func, + const raft::server::configuration& config = raft::server::configuration{}) +{ + const size_t node_count = 2; + auto test_config = test_case { + .nodes = node_count, + .config = std::vector(node_count, config) + }; + auto cluster = get_default_cluster(std::move(test_config)); + + constexpr std::string_view error_message = "some unfunny error message"; + auto check_default_message = [] (const raft::stopped_error& e) { + return std::string_view(e.what()) == "Raft instance is stopped"; + }; + auto check_error_message = [&error_message] (const raft::stopped_error& e) { + return std::string_view(e.what()) == fmt::format("Raft instance is stopped, reason: \"{}\"", error_message); + }; + + /* Case 1. Default error message */ { + auto& s1 = cluster.get_server(0); + s1.start().get(); + s1.abort().get(); + + abort_source as; + + // Regardless of the state of the passed abort_source, we should get raft::stopped_error. + BOOST_CHECK_EXCEPTION((void) func(s1, nullptr).get(), raft::stopped_error, check_default_message); + BOOST_CHECK_EXCEPTION((void) func(s1, &as).get(), raft::stopped_error, check_default_message); + as.request_abort(); + BOOST_CHECK_EXCEPTION((void) func(s1, &as).get(), raft::stopped_error, check_default_message); + } + + /* Case 2. Custom error message */ { + auto& s2 = cluster.get_server(1); + s2.start().get(); + s2.abort(sstring(error_message)).get(); + + abort_source as; + + // The same checks as above: we just verify that the error message is what we want. + BOOST_CHECK_EXCEPTION((void) func(s2, nullptr).get(), raft::stopped_error, check_error_message); + BOOST_CHECK_EXCEPTION((void) func(s2, &as).get(), raft::stopped_error, check_error_message); + as.request_abort(); + BOOST_CHECK_EXCEPTION((void) func(s2, &as).get(), raft::stopped_error, check_error_message); + } +} + +static void test_add_entry_on_aborted_server_aux(const bool enable_forwarding) { + raft::server::configuration config { .enable_forwarding = enable_forwarding }; + int val = 0; + auto add_entry = [&val] (raft::server& server, abort_source* as) { + return server.add_entry(create_command(val++), raft::wait_type::committed, as); + }; + test_func_on_aborted_server_aux(add_entry, config); +} + +static void test_modify_config_on_aborted_server_aux(const bool enable_forwarding) { + raft::server::configuration config { .enable_forwarding = enable_forwarding }; + auto modify_config = [] (raft::server& server, abort_source* as) { + return server.modify_config({}, {}, as); + }; + test_func_on_aborted_server_aux(modify_config, config); +} + +// Reproducers of SCYLLADB-841: After raft::server had been aborted, both +// add_entry and modify_config used to return raft::not_a_leader with +// a null ID when forwarding was disabled. +// +// We verify that that's not the case. Furthermore, we check that +// raft::stopped_error is preferred over raft::request_aborted +// if both exceptions apply. That's a more natural choice. +SEASTAR_THREAD_TEST_CASE(test_add_entry_on_aborted_server_disabled_forwarding) { + test_add_entry_on_aborted_server_aux(false); +} +SEASTAR_THREAD_TEST_CASE(test_add_entry_on_aborted_server_enabled_forwarding) { + test_add_entry_on_aborted_server_aux(true); +} +SEASTAR_THREAD_TEST_CASE(test_modify_config_on_aborted_server_disabled_forwarding) { + test_modify_config_on_aborted_server_aux(false); +} +SEASTAR_THREAD_TEST_CASE(test_modify_config_on_aborted_server_enabled_forwarding) { + test_modify_config_on_aborted_server_aux(true); +} + +// A call to raft::server::wait_for_leader should complete with +// raft::stopped_error if the server has been aborted, regardless +// of the state of the passed abort_source. +// Reproducer of SCYLLADB-841. +SEASTAR_THREAD_TEST_CASE(test_wait_for_leader_on_aborted_server) { + test_func_on_aborted_server_aux(&raft::server::wait_for_leader); +} + +// A call to raft::server::wait_for_state_change should complete with +// raft::stopped_error if the server has been aborted, regardless +// of the state of the passed abort_source. +// Reproducer of SCYLLADB-841. +SEASTAR_THREAD_TEST_CASE(test_wait_for_state_change_on_aborted_server) { + test_func_on_aborted_server_aux(&raft::server::wait_for_state_change); +}