From 3d4af4ecf15727b993a373753a87eac93fca13b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Fri, 5 Jan 2024 17:25:47 +0100 Subject: [PATCH 1/2] storage_service: make all Raft-based operations abortable During a shutdown, we call `storage_service::stop_transport` first. We may try to apply a Raft command after that, or still be in the the process of applying a command. In such a case, the shutdown process will hang because Raft retries replicating a command until it succeeds even in the case of a network error. It will stop when a corresponding abort source is set. However, if we pass `nullptr` to a function like `add_entry`, it won't stop. The shutdown process will hang forever. We fix all places that incorrectly pass `nullptr`. These shutdown hangs are not only theoretical. The incorrect `add_entry` call in `update_topology_state` caused scylladb/scylladb#16435. --- service/storage_service.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 076c458ce3..f1c1d89030 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1070,7 +1070,7 @@ class topology_coordinator { slogger.trace("raft topology: do update {} reason {}", updates, reason); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); - co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard)); + co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: race while changing state: {}. Retrying", reason); throw; @@ -1297,7 +1297,7 @@ class topology_coordinator { slogger.trace("raft topology: do update {} reason {}", m, reason); write_mutations change{{std::move(m)}}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), reason); - return _group0.client().add_entry_unguarded(std::move(g0_cmd)); + return _group0.client().add_entry_unguarded(std::move(g0_cmd), &_as); }); guard = co_await start_operation(); @@ -2502,7 +2502,7 @@ class topology_coordinator { assert(!_topo_sm._topology.transition_nodes.empty()); if (!_raft.get_configuration().contains(id)) { - co_await _raft.modify_config({raft::config_member({id, {}}, {})}, {}); + co_await _raft.modify_config({raft::config_member({id, {}}, {})}, {}, &_as); } release_node(std::move(node)); @@ -6909,7 +6909,7 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard)); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { slogger.debug("move_tablet(): concurrent modification, retrying"); @@ -6953,7 +6953,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) { topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard)); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { slogger.debug("set_tablet_balancing_enabled(): concurrent modification"); From df2034ebd78bf77785a4184c804d960bae1870dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Fri, 5 Jan 2024 18:08:44 +0100 Subject: [PATCH 2/2] server, raft_group0_client: remove the default nullptr values The previous commit has fixed 5 bugs of the same type - incorrectly passing the default nullptr to one of the changed functions. At least some of these bugs wouldn't appear if there was no default value. It's much harder to make this kind of a bug if you have to write "nullptr". It's also much easier to detect it in review. Moreover, these default values are rarely used outside tests. Keeping them is just not worth the time spent on debugging. --- raft/server.cc | 10 +++++----- raft/server.hh | 17 ++++++++++++----- service/broadcast_tables/experimental/lang.cc | 2 +- service/raft/raft_group0.cc | 2 +- service/raft/raft_group0_client.hh | 6 +++--- test/raft/raft_server_test.cc | 6 +++--- test/raft/replication.hh | 8 ++++---- 7 files changed, 29 insertions(+), 22 deletions(-) diff --git a/raft/server.cc b/raft/server.cc index a9f9dcff14..4271d1ee61 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -81,14 +81,14 @@ public: // server interface - future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) override; - future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) override; + future<> add_entry(command command, wait_type type, seastar::abort_source* as) override; + future<> set_configuration(config_member_set c_new, seastar::abort_source* as) override; raft::configuration get_configuration() const override; future<> start() override; future<> abort(sstring reason) override; bool is_alive() const override; term_t get_current_term() const override; - future<> read_barrier(seastar::abort_source* as = nullptr) override; + future<> read_barrier(seastar::abort_source* as) override; void wait_until_candidate() override; future<> wait_election_done() override; future<> wait_log_idx_term(std::pair idx_log) override; @@ -100,7 +100,7 @@ public: raft::server_id id() const override; void set_applier_queue_max_size(size_t queue_max_size) override; future<> stepdown(logical_clock::duration timeout) override; - future<> modify_config(std::vector add, std::vector del, seastar::abort_source* as = nullptr) override; + future<> modify_config(std::vector add, std::vector del, seastar::abort_source* as) override; future add_entry_on_leader(command command, seastar::abort_source* as); void register_metrics() override; size_t max_command_size() const override; @@ -276,7 +276,7 @@ private: // A helper to wait for a leader to get elected future<> wait_for_leader(seastar::abort_source* as); - future<> wait_for_state_change(seastar::abort_source* as = nullptr) override; + future<> wait_for_state_change(seastar::abort_source* as) override; // Get "safe to read" index from a leader future get_read_idx(server_id leader, seastar::abort_source* as); diff --git a/raft/server.hh b/raft/server.hh index 2489545c57..ebc731e105 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -77,6 +77,7 @@ public: // committed locally means simply that the commit index is beyond this entry's index. // // The caller may pass a pointer to an abort_source to make the operation abortable. + // It it passes nullptr, the operation is unabortable. // // Successful `add_entry` with `wait_type::committed` does not guarantee that `state_machine::apply` will be called // locally for this entry. Between the commit and the application we may receive a snapshot containing this entry, @@ -98,7 +99,7 @@ public: // Thrown if abort() was called on the server instance. // raft::not_a_leader // Thrown if the node is not a leader and forwarding is not enabled through enable_forwarding config option. - virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) = 0; + virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as) = 0; // Set a new cluster configuration. If the configuration is // identical to the previous one does nothing. @@ -124,6 +125,7 @@ public: // returned even in case of a successful config change. // // The caller may pass a pointer to an abort_source to make the operation abortable. + // It it passes nullptr, the operation is unabortable. // // Exceptions: // raft::conf_change_in_progress @@ -135,7 +137,7 @@ public: // forwarding the corresponding add_entry to the leader. // raft::request_aborted // Thrown if abort is requested before the operation finishes. - virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) = 0; + virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as) = 0; // A simplified wrapper around set_configuration() which adds // and deletes servers. Unlike set_configuration(), @@ -158,6 +160,7 @@ public: // // The caller may pass a pointer to an abort_source to make the operation abortable. // If abort is requested before the operation finishes, the future will contain `raft::request_aborted` exception. + // It the caller passes nullptr, the operation is unabortable. // // Exceptions: // raft::commit_status_unknown @@ -173,7 +176,7 @@ public: // raft::conf_change_in_progress // Thrown if the previous set_configuration/modify_config is not completed. virtual future<> modify_config(std::vector add, - std::vector del, seastar::abort_source* as = nullptr) = 0; + std::vector del, seastar::abort_source* as) = 0; // Return the currently known configuration virtual raft::configuration get_configuration() const = 0; @@ -203,13 +206,14 @@ public: // future has resolved successfully. // // The caller may pass a pointer to an abort_source to make the operation abortable. + // It it passes nullptr, the operation is unabortable. // // Exceptions: // raft::request_aborted // Thrown if abort is requested before the operation finishes. // raft::stopped_error // Thrown if abort() was called on the server instance. - virtual future<> read_barrier(seastar::abort_source* as = nullptr) = 0; + virtual future<> read_barrier(seastar::abort_source* as) = 0; // Initiate leader stepdown process. // @@ -245,7 +249,10 @@ public: // State changes can be coalesced, so it is not guaranteed that the caller will // get notification about each one of them. The state can even be the same after // the call as before, but term should be different. - virtual future<> wait_for_state_change(seastar::abort_source* as = nullptr) = 0; + // + // The caller may pass a pointer to an abort_source to make the function abortable. + // It it passes nullptr, the function is unabortable. + virtual future<> wait_for_state_change(seastar::abort_source* as) = 0; // Ad hoc functions for testing virtual void wait_until_candidate() = 0; diff --git a/service/broadcast_tables/experimental/lang.cc b/service/broadcast_tables/experimental/lang.cc index d1a1bd352c..5f957626a6 100644 --- a/service/broadcast_tables/experimental/lang.cc +++ b/service/broadcast_tables/experimental/lang.cc @@ -30,7 +30,7 @@ bool is_broadcast_table_statement(const sstring& keyspace, const sstring& column future execute(service::raft_group0_client& group0_client, const query& query) { auto group0_cmd = group0_client.prepare_command(broadcast_table_query{query}, "broadcast_tables query"); auto guard = group0_client.create_result_guard(group0_cmd.new_state_id); - co_await group0_client.add_entry_unguarded(std::move(group0_cmd)); + co_await group0_client.add_entry_unguarded(std::move(group0_cmd), nullptr); co_return guard.get(); } diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index a2d2ed8c1b..de02d39c50 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -160,7 +160,7 @@ void raft_group0::init_rpc_verbs(raft_group0& shard0_this) { ser::group0_rpc_verbs::register_group0_modify_config(&shard0_this._ms.local(), [&shard0_this] (const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector add, std::vector del) { return smp::submit_to(0, [&shard0_this, gid, add = std::move(add), del = std::move(del)]() mutable { - return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del)); + return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del), nullptr); }); }); diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh index 51db4339a4..c1659e9478 100644 --- a/service/raft/raft_group0_client.hh +++ b/service/raft/raft_group0_client.hh @@ -104,9 +104,9 @@ public: // Call after `system_keyspace` is initialized. future<> init(); - future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as = nullptr); + future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as); - future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as = nullptr); + future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as); // Ensures that all previously finished operations on group 0 are visible on this node; // in particular, performs a Raft read barrier on group 0. @@ -128,7 +128,7 @@ public: // FIXME?: this is kind of annoying for the user. // we could forward the call to shard 0, have group0_guard keep a foreign_ptr to the internal data structures on shard 0, // and add_entry would again forward to shard 0. - future start_operation(seastar::abort_source* as = nullptr); + future start_operation(seastar::abort_source* as); template requires std::same_as || std::same_as diff --git a/test/raft/raft_server_test.cc b/test/raft/raft_server_test.cc index 1413971eef..06d1eecbf4 100644 --- a/test/raft/raft_server_test.cc +++ b/test/raft/raft_server_test.cc @@ -26,9 +26,9 @@ SEASTAR_THREAD_TEST_CASE(test_check_abort_on_client_api) { return sstring(e.what()) == sstring("Raft instance is stopped, reason: \"test crash\""); }; BOOST_CHECK_EXCEPTION(cluster.add_entries(1, 0).get0(), raft::stopped_error, check_error); - BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}).get0(), raft::stopped_error, check_error); - BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier().get0(), raft::stopped_error, check_error); - BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}).get0(), raft::stopped_error, check_error); + BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}, nullptr).get0(), raft::stopped_error, check_error); + BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier(nullptr).get0(), raft::stopped_error, check_error); + BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}, nullptr).get0(), raft::stopped_error, check_error); } SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) { diff --git a/test/raft/replication.hh b/test/raft/replication.hh index 4706909756..d8de9109f2 100644 --- a/test/raft/replication.hh +++ b/test/raft/replication.hh @@ -951,7 +951,7 @@ future<> raft_cluster::add_entry(size_t val, std::optional server while (true) { try { auto& at = _servers[server ? *server : _leader].server; - co_await at->add_entry(create_command(val), raft::wait_type::committed); + co_await at->add_entry(create_command(val), raft::wait_type::committed, nullptr); break; } catch (raft::commit_status_unknown& e) { // FIXME: in some cases when we get `commit_status_unknown` the entry may have been applied. @@ -1201,7 +1201,7 @@ future<> raft_cluster::change_configuration(set_config sc) { } tlogger.debug("Changing configuration on leader {}", _leader); - co_await _servers[_leader].server->set_configuration(std::move(set)); + co_await _servers[_leader].server->set_configuration(std::move(set), nullptr); if (!new_config.contains(_leader)) { co_await free_election(); @@ -1211,7 +1211,7 @@ future<> raft_cluster::change_configuration(set_config sc) { // Add a dummy entry to confirm new configuration was committed try { co_await _servers[_leader].server->add_entry(create_command(dummy_command), - raft::wait_type::committed); + raft::wait_type::committed, nullptr); } catch (raft::not_a_leader& e) { // leader stepped down, implying config fully changed } catch (raft::commit_status_unknown& e) {} @@ -1338,7 +1338,7 @@ future<> raft_cluster::tick(::tick t) { template future<> raft_cluster::read(read_value r) { - co_await _servers[r.node_idx].server->read_barrier(); + co_await _servers[r.node_idx].server->read_barrier(nullptr); auto val = _servers[r.node_idx].sm->hasher->finalize_uint64(); auto expected = hasher_int::hash_range(r.expected_index).finalize_uint64(); BOOST_CHECK_MESSAGE(val == expected,