mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge 'raft: Throw stopped_error if server aborted' from Dawid Mędrek
This PR solves a series of similar problems related to executing
methods on an already aborted `raft::server`. They materialize
in various ways:
* For `add_entry` and `modify_config`, a `raft::not_a_leader` with
a null ID will be returned IF forwarding is disabled. This wasn't
a big problem because forwarding has always been enabled for group0,
but it's something that's nice to fix. It's might be relevant for
strong consistency that will heavily rely on this code.
* For `wait_for_leader` and `wait_for_state_change`, the calls may
hang and never resolve. A more detailed scenario is provided in a
commit message.
For the last two methods, we also extend their descriptions to indicate
the new possible exception type, `raft::stopped_error`. This change is
correct since either we enter the functions and throw the exception
immediately (if the server has already been aborted), or it will be
thrown upon the call to `raft::server::abort`.
We fix both issues. A few reproducer tests have been included to verify
that the calls finish and throw the appropriate errors.
Fixes SCYLLADB-841
Backport: Although the hanging problems haven't been spotted so far
(at least to the best of my knowledge), it's best to avoid
running into a problem like that, so let's backport the
changes to all supported versions. They're small enough.
Closes scylladb/scylladb#28822
* https://github.com/scylladb/scylladb:
raft: Make methods throw stopped_error if server aborted
raft: Throw stopped_error if server aborted
test: raft: Introduce get_default_cluster
This commit is contained in:
@@ -436,6 +436,8 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
|
||||
}
|
||||
|
||||
future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
||||
check_not_aborted();
|
||||
|
||||
if (_fsm->current_leader()) {
|
||||
co_return;
|
||||
}
|
||||
@@ -454,6 +456,8 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
||||
}
|
||||
|
||||
future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
|
||||
check_not_aborted();
|
||||
|
||||
if (!_state_change_promise) {
|
||||
_state_change_promise.emplace();
|
||||
}
|
||||
@@ -748,6 +752,8 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
|
||||
}
|
||||
_stats.add_command++;
|
||||
|
||||
check_not_aborted();
|
||||
|
||||
logger.trace("[{}] an entry is submitted", id());
|
||||
if (!_config.enable_forwarding) {
|
||||
if (const auto leader = _fsm->current_leader(); leader != _id) {
|
||||
@@ -858,6 +864,8 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
|
||||
}
|
||||
|
||||
future<> server_impl::modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) {
|
||||
check_not_aborted();
|
||||
|
||||
utils::get_local_injector().inject("raft/throw_commit_status_unknown_in_modify_config", [] {
|
||||
throw raft::commit_status_unknown();
|
||||
});
|
||||
|
||||
@@ -254,6 +254,8 @@ public:
|
||||
// It it passes nullptr, the function is unabortable.
|
||||
//
|
||||
// Exceptions:
|
||||
// raft::stopped_error
|
||||
// Thrown if abort() was called on the server instance.
|
||||
// raft::request_aborted
|
||||
// Thrown if abort is requested before the operation finishes.
|
||||
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;
|
||||
@@ -268,6 +270,8 @@ public:
|
||||
// It it passes nullptr, the function is unabortable.
|
||||
//
|
||||
// Exceptions:
|
||||
// raft::stopped_error
|
||||
// Thrown if abort() was called on the server instance.
|
||||
// raft::request_aborted
|
||||
// Thrown if abort is requested before the operation finishes.
|
||||
virtual future<> wait_for_leader(seastar::abort_source* as) = 0;
|
||||
|
||||
@@ -11,6 +11,18 @@
|
||||
const auto tick_delay = 100ms;
|
||||
#endif
|
||||
|
||||
// The word "default" means "usually used by the tests here".
|
||||
template <typename clock_type = std::chrono::steady_clock>
|
||||
static raft_cluster<clock_type> get_default_cluster(test_case test_config) {
|
||||
return raft_cluster<clock_type>{
|
||||
std::move(test_config),
|
||||
::apply_changes,
|
||||
0,
|
||||
0,
|
||||
0, false, tick_delay, rpc_config{}
|
||||
};
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_check_abort_on_client_api) {
|
||||
raft_cluster<std::chrono::steady_clock> cluster(
|
||||
test_case { .nodes = 1 },
|
||||
@@ -38,22 +50,18 @@ SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) {
|
||||
std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n";
|
||||
#else
|
||||
const size_t command_size = sizeof(size_t);
|
||||
raft_cluster<std::chrono::steady_clock> cluster(
|
||||
test_case {
|
||||
.nodes = 1,
|
||||
.config = std::vector<raft::server::configuration>({
|
||||
raft::server::configuration {
|
||||
.snapshot_threshold_log_size = 0,
|
||||
.snapshot_trailing_size = 0,
|
||||
.max_log_size = command_size,
|
||||
.max_command_size = command_size
|
||||
}
|
||||
})
|
||||
},
|
||||
::apply_changes,
|
||||
0,
|
||||
0,
|
||||
0, false, tick_delay, rpc_config{});
|
||||
test_case test_config {
|
||||
.nodes = 1,
|
||||
.config = std::vector<raft::server::configuration>({
|
||||
raft::server::configuration {
|
||||
.snapshot_threshold_log_size = 0,
|
||||
.snapshot_trailing_size = 0,
|
||||
.max_log_size = command_size,
|
||||
.max_command_size = command_size
|
||||
}
|
||||
})
|
||||
};
|
||||
auto cluster = get_default_cluster(std::move(test_config));
|
||||
cluster.start_all().get();
|
||||
auto stop = defer([&cluster] { cluster.stop_all().get(); });
|
||||
|
||||
@@ -76,23 +84,7 @@ SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) {
|
||||
// * The future will contain an exception, and its type will be `raft::request_aborted`.
|
||||
// Reproduces SCYLLADB-665.
|
||||
SEASTAR_THREAD_TEST_CASE(test_aborting_wait_for_state_change) {
|
||||
const size_t command_size = sizeof(size_t);
|
||||
raft_cluster<std::chrono::steady_clock> cluster(
|
||||
test_case {
|
||||
.nodes = 1,
|
||||
.config = std::vector<raft::server::configuration>({
|
||||
raft::server::configuration {
|
||||
.snapshot_threshold_log_size = 0,
|
||||
.snapshot_trailing_size = 0,
|
||||
.max_log_size = command_size,
|
||||
.max_command_size = command_size
|
||||
}
|
||||
})
|
||||
},
|
||||
::apply_changes,
|
||||
0,
|
||||
0,
|
||||
0, false, tick_delay, rpc_config{});
|
||||
auto cluster = get_default_cluster(test_case{ .nodes = 1 });
|
||||
cluster.start_all().get();
|
||||
auto stop = defer([&cluster] { cluster.stop_all().get(); });
|
||||
|
||||
@@ -110,3 +102,104 @@ SEASTAR_THREAD_TEST_CASE(test_aborting_wait_for_state_change) {
|
||||
as.request_abort();
|
||||
BOOST_CHECK_THROW((void) fut_default_ex.get(), raft::request_aborted);
|
||||
}
|
||||
|
||||
static void test_func_on_aborted_server_aux(
|
||||
std::function<future<>(raft::server&, abort_source*)> func,
|
||||
const raft::server::configuration& config = raft::server::configuration{})
|
||||
{
|
||||
const size_t node_count = 2;
|
||||
auto test_config = test_case {
|
||||
.nodes = node_count,
|
||||
.config = std::vector<raft::server::configuration>(node_count, config)
|
||||
};
|
||||
auto cluster = get_default_cluster(std::move(test_config));
|
||||
|
||||
constexpr std::string_view error_message = "some unfunny error message";
|
||||
auto check_default_message = [] (const raft::stopped_error& e) {
|
||||
return std::string_view(e.what()) == "Raft instance is stopped";
|
||||
};
|
||||
auto check_error_message = [&error_message] (const raft::stopped_error& e) {
|
||||
return std::string_view(e.what()) == fmt::format("Raft instance is stopped, reason: \"{}\"", error_message);
|
||||
};
|
||||
|
||||
/* Case 1. Default error message */ {
|
||||
auto& s1 = cluster.get_server(0);
|
||||
s1.start().get();
|
||||
s1.abort().get();
|
||||
|
||||
abort_source as;
|
||||
|
||||
// Regardless of the state of the passed abort_source, we should get raft::stopped_error.
|
||||
BOOST_CHECK_EXCEPTION((void) func(s1, nullptr).get(), raft::stopped_error, check_default_message);
|
||||
BOOST_CHECK_EXCEPTION((void) func(s1, &as).get(), raft::stopped_error, check_default_message);
|
||||
as.request_abort();
|
||||
BOOST_CHECK_EXCEPTION((void) func(s1, &as).get(), raft::stopped_error, check_default_message);
|
||||
}
|
||||
|
||||
/* Case 2. Custom error message */ {
|
||||
auto& s2 = cluster.get_server(1);
|
||||
s2.start().get();
|
||||
s2.abort(sstring(error_message)).get();
|
||||
|
||||
abort_source as;
|
||||
|
||||
// The same checks as above: we just verify that the error message is what we want.
|
||||
BOOST_CHECK_EXCEPTION((void) func(s2, nullptr).get(), raft::stopped_error, check_error_message);
|
||||
BOOST_CHECK_EXCEPTION((void) func(s2, &as).get(), raft::stopped_error, check_error_message);
|
||||
as.request_abort();
|
||||
BOOST_CHECK_EXCEPTION((void) func(s2, &as).get(), raft::stopped_error, check_error_message);
|
||||
}
|
||||
}
|
||||
|
||||
static void test_add_entry_on_aborted_server_aux(const bool enable_forwarding) {
|
||||
raft::server::configuration config { .enable_forwarding = enable_forwarding };
|
||||
int val = 0;
|
||||
auto add_entry = [&val] (raft::server& server, abort_source* as) {
|
||||
return server.add_entry(create_command(val++), raft::wait_type::committed, as);
|
||||
};
|
||||
test_func_on_aborted_server_aux(add_entry, config);
|
||||
}
|
||||
|
||||
static void test_modify_config_on_aborted_server_aux(const bool enable_forwarding) {
|
||||
raft::server::configuration config { .enable_forwarding = enable_forwarding };
|
||||
auto modify_config = [] (raft::server& server, abort_source* as) {
|
||||
return server.modify_config({}, {}, as);
|
||||
};
|
||||
test_func_on_aborted_server_aux(modify_config, config);
|
||||
}
|
||||
|
||||
// Reproducers of SCYLLADB-841: After raft::server had been aborted, both
|
||||
// add_entry and modify_config used to return raft::not_a_leader with
|
||||
// a null ID when forwarding was disabled.
|
||||
//
|
||||
// We verify that that's not the case. Furthermore, we check that
|
||||
// raft::stopped_error is preferred over raft::request_aborted
|
||||
// if both exceptions apply. That's a more natural choice.
|
||||
SEASTAR_THREAD_TEST_CASE(test_add_entry_on_aborted_server_disabled_forwarding) {
|
||||
test_add_entry_on_aborted_server_aux(false);
|
||||
}
|
||||
SEASTAR_THREAD_TEST_CASE(test_add_entry_on_aborted_server_enabled_forwarding) {
|
||||
test_add_entry_on_aborted_server_aux(true);
|
||||
}
|
||||
SEASTAR_THREAD_TEST_CASE(test_modify_config_on_aborted_server_disabled_forwarding) {
|
||||
test_modify_config_on_aborted_server_aux(false);
|
||||
}
|
||||
SEASTAR_THREAD_TEST_CASE(test_modify_config_on_aborted_server_enabled_forwarding) {
|
||||
test_modify_config_on_aborted_server_aux(true);
|
||||
}
|
||||
|
||||
// A call to raft::server::wait_for_leader should complete with
|
||||
// raft::stopped_error if the server has been aborted, regardless
|
||||
// of the state of the passed abort_source.
|
||||
// Reproducer of SCYLLADB-841.
|
||||
SEASTAR_THREAD_TEST_CASE(test_wait_for_leader_on_aborted_server) {
|
||||
test_func_on_aborted_server_aux(&raft::server::wait_for_leader);
|
||||
}
|
||||
|
||||
// A call to raft::server::wait_for_state_change should complete with
|
||||
// raft::stopped_error if the server has been aborted, regardless
|
||||
// of the state of the passed abort_source.
|
||||
// Reproducer of SCYLLADB-841.
|
||||
SEASTAR_THREAD_TEST_CASE(test_wait_for_state_change_on_aborted_server) {
|
||||
test_func_on_aborted_server_aux(&raft::server::wait_for_state_change);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user