Merge 'storage_service: make all Raft-based operations abortable' from Patryk Jędrzejczak

During a shutdown, we call `storage_service::stop_transport` first.
We may try to apply a Raft command after that, or still be in the
the process of applying a command. In such a case, the shutdown
process will hang because Raft retries replicating a command until
it succeeds even in the case of a network error. It will stop when
a corresponding abort source is set. However, if we pass `nullptr`
to a function like `add_entry`, it won't stop. The shutdown
process will hang forever.

We fix all places that incorrectly pass `nullptr`. These shutdown
hangs are not only theoretical. The incorrect `add_entry` call in
`update_topology_state` caused scylladb/scylladb#16435.

Additionally, we remove the default `nullptr` values in all member
functions of `server` and `raft_group0_client` to avoid similar bugs
in the future.

Fixes scylladb/scylladb#16435

Closes scylladb/scylladb#16663

* github.com:scylladb/scylladb:
  server, raft_group0_client: remove the default nullptr values
  storage_service: make all Raft-based operations abortable
This commit is contained in:
Kamil Braun
2024-01-08 11:30:56 +01:00
8 changed files with 34 additions and 27 deletions

View File

@@ -81,14 +81,14 @@ public:
// server interface
future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) override;
future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) override;
future<> add_entry(command command, wait_type type, seastar::abort_source* as) override;
future<> set_configuration(config_member_set c_new, seastar::abort_source* as) override;
raft::configuration get_configuration() const override;
future<> start() override;
future<> abort(sstring reason) override;
bool is_alive() const override;
term_t get_current_term() const override;
future<> read_barrier(seastar::abort_source* as = nullptr) override;
future<> read_barrier(seastar::abort_source* as) override;
void wait_until_candidate() override;
future<> wait_election_done() override;
future<> wait_log_idx_term(std::pair<index_t, term_t> idx_log) override;
@@ -100,7 +100,7 @@ public:
raft::server_id id() const override;
void set_applier_queue_max_size(size_t queue_max_size) override;
future<> stepdown(logical_clock::duration timeout) override;
future<> modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as = nullptr) override;
future<> modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) override;
future<entry_id> add_entry_on_leader(command command, seastar::abort_source* as);
void register_metrics() override;
size_t max_command_size() const override;
@@ -276,7 +276,7 @@ private:
// A helper to wait for a leader to get elected
future<> wait_for_leader(seastar::abort_source* as);
future<> wait_for_state_change(seastar::abort_source* as = nullptr) override;
future<> wait_for_state_change(seastar::abort_source* as) override;
// Get "safe to read" index from a leader
future<read_barrier_reply> get_read_idx(server_id leader, seastar::abort_source* as);

View File

@@ -77,6 +77,7 @@ public:
// committed locally means simply that the commit index is beyond this entry's index.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// It it passes nullptr, the operation is unabortable.
//
// Successful `add_entry` with `wait_type::committed` does not guarantee that `state_machine::apply` will be called
// locally for this entry. Between the commit and the application we may receive a snapshot containing this entry,
@@ -98,7 +99,7 @@ public:
// Thrown if abort() was called on the server instance.
// raft::not_a_leader
// Thrown if the node is not a leader and forwarding is not enabled through enable_forwarding config option.
virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) = 0;
virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as) = 0;
// Set a new cluster configuration. If the configuration is
// identical to the previous one does nothing.
@@ -124,6 +125,7 @@ public:
// returned even in case of a successful config change.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// It it passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::conf_change_in_progress
@@ -135,7 +137,7 @@ public:
// forwarding the corresponding add_entry to the leader.
// raft::request_aborted
// Thrown if abort is requested before the operation finishes.
virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) = 0;
virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as) = 0;
// A simplified wrapper around set_configuration() which adds
// and deletes servers. Unlike set_configuration(),
@@ -158,6 +160,7 @@ public:
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// If abort is requested before the operation finishes, the future will contain `raft::request_aborted` exception.
// It the caller passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::commit_status_unknown
@@ -173,7 +176,7 @@ public:
// raft::conf_change_in_progress
// Thrown if the previous set_configuration/modify_config is not completed.
virtual future<> modify_config(std::vector<config_member> add,
std::vector<server_id> del, seastar::abort_source* as = nullptr) = 0;
std::vector<server_id> del, seastar::abort_source* as) = 0;
// Return the currently known configuration
virtual raft::configuration get_configuration() const = 0;
@@ -203,13 +206,14 @@ public:
// future has resolved successfully.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// It it passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::request_aborted
// Thrown if abort is requested before the operation finishes.
// raft::stopped_error
// Thrown if abort() was called on the server instance.
virtual future<> read_barrier(seastar::abort_source* as = nullptr) = 0;
virtual future<> read_barrier(seastar::abort_source* as) = 0;
// Initiate leader stepdown process.
//
@@ -245,7 +249,10 @@ public:
// State changes can be coalesced, so it is not guaranteed that the caller will
// get notification about each one of them. The state can even be the same after
// the call as before, but term should be different.
virtual future<> wait_for_state_change(seastar::abort_source* as = nullptr) = 0;
//
// The caller may pass a pointer to an abort_source to make the function abortable.
// It it passes nullptr, the function is unabortable.
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;
// Ad hoc functions for testing
virtual void wait_until_candidate() = 0;

View File

@@ -30,7 +30,7 @@ bool is_broadcast_table_statement(const sstring& keyspace, const sstring& column
future<query_result> execute(service::raft_group0_client& group0_client, const query& query) {
auto group0_cmd = group0_client.prepare_command(broadcast_table_query{query}, "broadcast_tables query");
auto guard = group0_client.create_result_guard(group0_cmd.new_state_id);
co_await group0_client.add_entry_unguarded(std::move(group0_cmd));
co_await group0_client.add_entry_unguarded(std::move(group0_cmd), nullptr);
co_return guard.get();
}

View File

@@ -160,7 +160,7 @@ void raft_group0::init_rpc_verbs(raft_group0& shard0_this) {
ser::group0_rpc_verbs::register_group0_modify_config(&shard0_this._ms.local(),
[&shard0_this] (const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector<raft::config_member> add, std::vector<raft::server_id> del) {
return smp::submit_to(0, [&shard0_this, gid, add = std::move(add), del = std::move(del)]() mutable {
return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del));
return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del), nullptr);
});
});

View File

@@ -104,9 +104,9 @@ public:
// Call after `system_keyspace` is initialized.
future<> init();
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as = nullptr);
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as);
future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as = nullptr);
future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as);
// Ensures that all previously finished operations on group 0 are visible on this node;
// in particular, performs a Raft read barrier on group 0.
@@ -128,7 +128,7 @@ public:
// FIXME?: this is kind of annoying for the user.
// we could forward the call to shard 0, have group0_guard keep a foreign_ptr to the internal data structures on shard 0,
// and add_entry would again forward to shard 0.
future<group0_guard> start_operation(seastar::abort_source* as = nullptr);
future<group0_guard> start_operation(seastar::abort_source* as);
template<typename Command>
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>

View File

@@ -1070,7 +1070,7 @@ class topology_coordinator {
slogger.trace("raft topology: do update {} reason {}", updates, reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard));
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
} catch (group0_concurrent_modification&) {
slogger.info("raft topology: race while changing state: {}. Retrying", reason);
throw;
@@ -1297,7 +1297,7 @@ class topology_coordinator {
slogger.trace("raft topology: do update {} reason {}", m, reason);
write_mutations change{{std::move(m)}};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), reason);
return _group0.client().add_entry_unguarded(std::move(g0_cmd));
return _group0.client().add_entry_unguarded(std::move(g0_cmd), &_as);
});
guard = co_await start_operation();
@@ -2502,7 +2502,7 @@ class topology_coordinator {
assert(!_topo_sm._topology.transition_nodes.empty());
if (!_raft.get_configuration().contains(id)) {
co_await _raft.modify_config({raft::config_member({id, {}}, {})}, {});
co_await _raft.modify_config({raft::config_member({id, {}}, {})}, {}, &_as);
}
release_node(std::move(node));
@@ -6909,7 +6909,7 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator:
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard));
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
break;
} catch (group0_concurrent_modification&) {
slogger.debug("move_tablet(): concurrent modification, retrying");
@@ -6953,7 +6953,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard));
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
break;
} catch (group0_concurrent_modification&) {
slogger.debug("set_tablet_balancing_enabled(): concurrent modification");

View File

@@ -26,9 +26,9 @@ SEASTAR_THREAD_TEST_CASE(test_check_abort_on_client_api) {
return sstring(e.what()) == sstring("Raft instance is stopped, reason: \"test crash\"");
};
BOOST_CHECK_EXCEPTION(cluster.add_entries(1, 0).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier().get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}, nullptr).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier(nullptr).get0(), raft::stopped_error, check_error);
BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}, nullptr).get0(), raft::stopped_error, check_error);
}
SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) {

View File

@@ -951,7 +951,7 @@ future<> raft_cluster<Clock>::add_entry(size_t val, std::optional<size_t> server
while (true) {
try {
auto& at = _servers[server ? *server : _leader].server;
co_await at->add_entry(create_command(val), raft::wait_type::committed);
co_await at->add_entry(create_command(val), raft::wait_type::committed, nullptr);
break;
} catch (raft::commit_status_unknown& e) {
// FIXME: in some cases when we get `commit_status_unknown` the entry may have been applied.
@@ -1201,7 +1201,7 @@ future<> raft_cluster<Clock>::change_configuration(set_config sc) {
}
tlogger.debug("Changing configuration on leader {}", _leader);
co_await _servers[_leader].server->set_configuration(std::move(set));
co_await _servers[_leader].server->set_configuration(std::move(set), nullptr);
if (!new_config.contains(_leader)) {
co_await free_election();
@@ -1211,7 +1211,7 @@ future<> raft_cluster<Clock>::change_configuration(set_config sc) {
// Add a dummy entry to confirm new configuration was committed
try {
co_await _servers[_leader].server->add_entry(create_command(dummy_command),
raft::wait_type::committed);
raft::wait_type::committed, nullptr);
} catch (raft::not_a_leader& e) {
// leader stepped down, implying config fully changed
} catch (raft::commit_status_unknown& e) {}
@@ -1338,7 +1338,7 @@ future<> raft_cluster<Clock>::tick(::tick t) {
template <typename Clock>
future<> raft_cluster<Clock>::read(read_value r) {
co_await _servers[r.node_idx].server->read_barrier();
co_await _servers[r.node_idx].server->read_barrier(nullptr);
auto val = _servers[r.node_idx].sm->hasher->finalize_uint64();
auto expected = hasher_int::hash_range(r.expected_index).finalize_uint64();
BOOST_CHECK_MESSAGE(val == expected,