From 90ced080a8a6a433c0d6ec3de6f2fbad8a59e4cc Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 29 Sep 2024 16:36:59 +0300 Subject: [PATCH] group: hold group0 shutdown gate during async operations Wait for all outstanding async work that uses group0 to complete before destroying group0 server. Fixes scylladb/scylladb#20701 (cherry picked from commit e642f0a86d51e045cada125a21800ea526b5336b) --- service/raft/raft_group0.cc | 5 ++++- service/raft/raft_group0.hh | 5 +++++ service/storage_service.cc | 33 ++++++++++++++++++++------------- service/storage_service.hh | 4 ++-- service/topology_coordinator.cc | 3 +++ 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index c188a756f7..bd2e5f6649 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -393,9 +393,11 @@ future<> raft_group0::abort() { co_await smp::invoke_on_all([this]() { return uninit_rpc_verbs(_ms.local()); }); - co_await _shutdown_gate.close(); _leadership_monitor_as.request_abort(); + + co_await _shutdown_gate.close(); + co_await std::move(_leadership_monitor); co_await stop_group0(); @@ -440,6 +442,7 @@ future<> raft_group0::leadership_monitor_fiber() { } }); + auto holder = hold_group0_gate(); while (true) { while (!group0_server().is_leader()) { co_await group0_server().wait_for_state_change(&_leadership_monitor_as); diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh index 5e5de70484..38087e8836 100644 --- a/service/raft/raft_group0.hh +++ b/service/raft/raft_group0.hh @@ -291,6 +291,11 @@ public: return _raft_gr.group0_with_timeouts(); } + // Hold shutdown gate to be waited during shutdown + gate::holder hold_group0_gate() { + return _shutdown_gate.hold(); + } + // Returns true after the group 0 server has been started. bool joined_group0() const; diff --git a/service/storage_service.cc b/service/storage_service.cc index 5182e75ce5..dc4767727e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1032,7 +1032,7 @@ public: // }}} raft_ip_address_updater -future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded& proxy) noexcept { +future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded& proxy) noexcept { while (!_group0_as.abort_requested()) { bool err = false; try { @@ -1134,7 +1134,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded storage_service::raft_state_monitor_fiber(raft::server& raft, sharded& sys_dist_ks) { +future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder, sharded& sys_dist_ks) { std::optional as; try { @@ -1867,9 +1867,9 @@ future<> storage_service::join_topology(sharded co_await raft_initialize_discovery_leader(join_params); // start topology coordinator fiber - _raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks); + _raft_state_monitor = raft_state_monitor_fiber(*raft_server, _group0->hold_group0_gate(), sys_dist_ks); // start cleanup fiber - _sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, proxy); + _sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, _group0->hold_group0_gate(), proxy); // Need to start system_distributed_keyspace before bootstrap because bootstrapping // process may access those tables. @@ -2150,7 +2150,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded // Start the topology coordinator monitor fiber. If we are the leader, this will start // the topology coordinator which is responsible for driving the upgrade process. try { - _raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), sys_dist_ks); + _raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate(), sys_dist_ks); } catch (...) { // The calls above can theoretically fail due to coroutine frame allocation failure. // Abort in this case as the node should be in a pretty bad shape anyway. @@ -2176,7 +2176,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded } try { - _sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), proxy); + _sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy); start_tablet_split_monitor(); } catch (...) { rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception()); @@ -3649,6 +3649,7 @@ static size_t count_normal_token_owners(const topology& topology) { future<> storage_service::raft_decommission() { auto& raft_server = _group0->group0_server(); + auto holder = _group0->hold_group0_gate(); utils::UUID request_id; while (true) { @@ -4650,6 +4651,7 @@ future<> storage_service::do_drain() { future<> storage_service::do_cluster_cleanup() { auto& raft_server = _group0->group0_server(); + auto holder = _group0->hold_group0_gate(); while (true) { auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{}); @@ -4720,6 +4722,7 @@ future<> storage_service::wait_for_topology_not_busy() { future<> storage_service::raft_rebuild(utils::optional_param sdc_param) { auto& raft_server = _group0->group0_server(); + auto holder = _group0->hold_group0_gate(); utils::UUID request_id; while (true) { @@ -5474,12 +5477,15 @@ future storage_service::raft_topology_cmd_handler(raft try { auto& raft_server = _group0->group0_server(); + auto group0_holder = _group0->hold_group0_gate(); // do barrier to make sure we always see the latest topology co_await raft_server.read_barrier(&_group0_as); if (raft_server.get_current_term() != term) { // Return an error since the command is from outdated leader co_return result; } + auto id = raft_server.id(); + group0_holder.release(); { auto& state = _raft_topology_cmd_handler_state; @@ -5591,7 +5597,7 @@ future storage_service::raft_topology_cmd_handler(raft break; case raft_topology_cmd::command::stream_ranges: { co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> { - const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second; + const auto& rs = _topology_state_machine._topology.find(id)->second; auto tstate = _topology_state_machine._topology.tstate; if (!rs.ring || rs.ring->tokens.empty()) { rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state); @@ -5640,11 +5646,11 @@ future storage_service::raft_topology_cmd_handler(raft utils::get_local_injector().inject("stop_after_streaming", [] { std::raise(SIGSTOP); }); } else { - auto replaced_id = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id; + auto replaced_id = std::get(_topology_state_machine._topology.req_param[id]).replaced_id; auto task = co_await get_task_manager_module().make_and_start_task(parent_info, - parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &raft_server, replaced_id] () -> future<> { - if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) { - on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id())); + parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &id, replaced_id] () -> future<> { + if (!_topology_state_machine._topology.req_param.contains(id)) { + on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id)); } if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) { auto ignored_nodes = boost::copy_range>(_topology_state_machine._topology.ignored_nodes | boost::adaptors::transformed([] (const auto& id) { @@ -5728,7 +5734,7 @@ future storage_service::raft_topology_cmd_handler(raft } break; case node_state::rebuilding: { - auto source_dc = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc; + auto source_dc = std::get(_topology_state_machine._topology.req_param[id]).source_dc; rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0}; auto task = co_await get_task_manager_module().make_and_start_task(parent_info, @@ -5775,7 +5781,7 @@ future storage_service::raft_topology_cmd_handler(raft case node_state::none: case node_state::removing: on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster", - raft_server.id(), rs.state)); + id, rs.state)); break; } })); @@ -6491,6 +6497,7 @@ future storage_service::join_node_request_handler(join }); auto& g0_server = _group0->group0_server(); + auto g0_holder = _group0->hold_group0_gate(); if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) { // There is a peculiar case that can happen if the leader is killed // and then replaced very quickly: diff --git a/service/storage_service.hh b/service/storage_service.hh index 2e237a3e1b..53f352b997 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -835,7 +835,7 @@ private: future<> _raft_state_monitor = make_ready_future<>(); // This fibers monitors raft state and start/stops the topology change // coordinator fiber - future<> raft_state_monitor_fiber(raft::server&, sharded& sys_dist_ks); + future<> raft_state_monitor_fiber(raft::server&, gate::holder, sharded& sys_dist_ks); public: bool topology_global_queue_empty() const { @@ -976,7 +976,7 @@ private: semaphore _join_node_response_handler_mutex{1}; future<> _sstable_cleanup_fiber = make_ready_future<>(); - future<> sstable_cleanup_fiber(raft::server& raft, sharded& proxy) noexcept; + future<> sstable_cleanup_fiber(raft::server& raft, gate::holder, sharded& proxy) noexcept; // We need to be able to abort all group0 operation during shutdown, so we need special abort source for that abort_source _group0_as; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 2549979b7d..4bc809fd46 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -109,6 +109,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { std::chrono::milliseconds _ring_delay; + gate::holder _group0_holder; + using drop_guard_and_retake = bool_class; // Engaged if an ongoing topology change should be rolled back. The string inside @@ -2501,6 +2503,7 @@ public: , _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler)) , _tablet_allocator(tablet_allocator) , _ring_delay(ring_delay) + , _group0_holder(_group0.hold_group0_gate()) {} // Returns true if the upgrade was done, returns false if upgrade was interrupted.