From f2e458dcb27fc9653b2ed380393bcab3e5638c93 Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Wed, 22 Apr 2026 14:43:31 +0200 Subject: [PATCH] Merge 'storage_service: fix REST API races during shutdown and cross-shard forwarding' from Piotr Smaron MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit REST route removal unregisters handlers but does not wait for requests that already entered storage_service. A request can therefore suspend inside an async operation, restart proceeds to tear the service down, and the coroutine later resumes against destroyed members such as _topology_state_machine, _group0, or _sys_ks — a use-after-destruction bug that surfaces as UBSAN dynamic-type failures (e.g. the crash seen from topology_state_load()). Fix this by holding storage_service::_async_gate from the entry boundary of every externally-triggered async operation so that stop() drains them before teardown begins. The gate is acquired in run_with_api_lock, run_with_no_api_lock, and in individual REST handlers that bypass those wrappers (reload_raft_topology_state, mark_excluded, removenode, schema reload, topology-request waits/abort, cleanup, ring/schema queries, SSTable dictionary training/publish, and sampling). Additionally, fix get_ownership() and abort_topology_request() which forward work to shard 0 but were still referencing the caller-shard's `this` pointer instead of the destination-shard instance, causing silent cross-shard access to shard-local state. Add a cluster regression test that repeatedly exercises the multi-shard ownership REST path to cover the forwarding fix. Fixes: SCYLLADB-1687 Should be backported to all branches, the code has been introduced around 2024.1 release. Closes scylladb/scylladb#29373 * github.com:scylladb/scylladb: storage_service: fix shard-0 forwarding in REST helpers storage_service: gate REST-facing async operations during shutdown storage_service: prepare for async gate in REST handlers (cherry picked from commit 4043d95810e0d47d9eebfcfe2e45c3cd22d9e54f) Closes scylladb/scylladb#29611 Closes scylladb/scylladb#29668 --- api/storage_service.cc | 166 ++++++++++++++++++++----------------- service/storage_service.cc | 4 +- service/storage_service.hh | 18 +++- 3 files changed, 108 insertions(+), 80 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 2205c4a0cb..b18386cdc1 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1739,82 +1739,98 @@ rest_bind(FuncType func, BindArgs&... args) { return std::bind_front(func, std::ref(args)...); } +// Hold the storage_service async gate for the duration of async REST +// handlers so stop() drains in-flight requests before teardown. +// Synchronous handlers don't yield and need no gate. +static seastar::httpd::future_json_function +gated(sharded& ss, seastar::httpd::future_json_function fn) { + return [fn = std::move(fn), &ss](std::unique_ptr req) -> future { + auto holder = ss.local().hold_async_gate(); + co_return co_await fn(std::move(req)); + }; +} + +static seastar::httpd::json_request_function +gated(sharded&, seastar::httpd::json_request_function fn) { + return fn; +} + void set_storage_service(http_context& ctx, routes& r, sharded& ss, service::raft_group0_client& group0_client) { - ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss)); - ss::toppartitions_generic.set(r, rest_bind(rest_toppartitions_generic, ctx)); - ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss)); - ss::get_scylla_release_version.set(r, rest_bind(rest_get_scylla_release_version, ss)); - ss::get_schema_version.set(r, rest_bind(rest_get_schema_version, ss)); - ss::get_range_to_endpoint_map.set(r, rest_bind(rest_get_range_to_endpoint_map, ctx, ss)); - ss::get_pending_range_to_endpoint_map.set(r, rest_bind(rest_get_pending_range_to_endpoint_map, ctx)); - ss::describe_ring.set(r, rest_bind(rest_describe_ring, ctx, ss)); - ss::get_current_generation_number.set(r, rest_bind(rest_get_current_generation_number, ss)); - ss::get_natural_endpoints.set(r, rest_bind(rest_get_natural_endpoints, ctx, ss)); - ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss)); - ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss)); - ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss)); - ss::force_flush.set(r, rest_bind(rest_force_flush, ctx)); - ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx)); - ss::decommission.set(r, rest_bind(rest_decommission, ss)); - ss::move.set(r, rest_bind(rest_move, ss)); - ss::remove_node.set(r, rest_bind(rest_remove_node, ss)); - ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss)); - ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss)); - ss::set_logging_level.set(r, rest_bind(rest_set_logging_level)); - ss::get_logging_levels.set(r, rest_bind(rest_get_logging_levels)); - ss::get_operation_mode.set(r, rest_bind(rest_get_operation_mode, ss)); - ss::is_starting.set(r, rest_bind(rest_is_starting, ss)); - ss::get_drain_progress.set(r, rest_bind(rest_get_drain_progress, ss)); - ss::drain.set(r, rest_bind(rest_drain, ss)); - ss::stop_gossiping.set(r, rest_bind(rest_stop_gossiping, ss)); - ss::start_gossiping.set(r, rest_bind(rest_start_gossiping, ss)); - ss::is_gossip_running.set(r, rest_bind(rest_is_gossip_running, ss)); - ss::stop_daemon.set(r, rest_bind(rest_stop_daemon)); - ss::is_initialized.set(r, rest_bind(rest_is_initialized, ss)); - ss::join_ring.set(r, rest_bind(rest_join_ring)); - ss::is_joined.set(r, rest_bind(rest_is_joined, ss)); - ss::is_incremental_backups_enabled.set(r, rest_bind(rest_is_incremental_backups_enabled, ctx)); - ss::set_incremental_backups_enabled.set(r, rest_bind(rest_set_incremental_backups_enabled, ctx)); - ss::rebuild.set(r, rest_bind(rest_rebuild, ss)); - ss::bulk_load.set(r, rest_bind(rest_bulk_load)); - ss::bulk_load_async.set(r, rest_bind(rest_bulk_load_async)); - ss::reschedule_failed_deletions.set(r, rest_bind(rest_reschedule_failed_deletions)); - ss::sample_key_range.set(r, rest_bind(rest_sample_key_range)); - ss::reset_local_schema.set(r, rest_bind(rest_reset_local_schema, ss)); - ss::set_trace_probability.set(r, rest_bind(rest_set_trace_probability)); - ss::get_trace_probability.set(r, rest_bind(rest_get_trace_probability)); - ss::get_slow_query_info.set(r, rest_bind(rest_get_slow_query_info)); - ss::set_slow_query.set(r, rest_bind(rest_set_slow_query)); - ss::deliver_hints.set(r, rest_bind(rest_deliver_hints)); - ss::get_cluster_name.set(r, rest_bind(rest_get_cluster_name, ss)); - ss::get_partitioner_name.set(r, rest_bind(rest_get_partitioner_name, ss)); - ss::get_tombstone_warn_threshold.set(r, rest_bind(rest_get_tombstone_warn_threshold)); - ss::set_tombstone_warn_threshold.set(r, rest_bind(rest_set_tombstone_warn_threshold)); - ss::get_tombstone_failure_threshold.set(r, rest_bind(rest_get_tombstone_failure_threshold)); - ss::set_tombstone_failure_threshold.set(r, rest_bind(rest_set_tombstone_failure_threshold)); - ss::get_batch_size_failure_threshold.set(r, rest_bind(rest_get_batch_size_failure_threshold)); - ss::set_batch_size_failure_threshold.set(r, rest_bind(rest_set_batch_size_failure_threshold)); - ss::set_hinted_handoff_throttle_in_kb.set(r, rest_bind(rest_set_hinted_handoff_throttle_in_kb)); - ss::get_exceptions.set(r, rest_bind(rest_get_exceptions, ss)); - ss::get_total_hints_in_progress.set(r, rest_bind(rest_get_total_hints_in_progress)); - ss::get_total_hints.set(r, rest_bind(rest_get_total_hints)); - ss::get_ownership.set(r, rest_bind(rest_get_ownership, ctx, ss)); - ss::get_effective_ownership.set(r, rest_bind(rest_get_effective_ownership, ctx, ss)); - ss::retrain_dict.set(r, rest_bind(rest_retrain_dict, ctx, ss, group0_client)); - ss::estimate_compression_ratios.set(r, rest_bind(rest_estimate_compression_ratios, ctx, ss)); - ss::sstable_info.set(r, rest_bind(rest_sstable_info, ctx)); - ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client)); - ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss)); - ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss)); - ss::raft_topology_get_cmd_status.set(r, rest_bind(rest_raft_topology_get_cmd_status, ss)); - ss::move_tablet.set(r, rest_bind(rest_move_tablet, ctx, ss)); - ss::add_tablet_replica.set(r, rest_bind(rest_add_tablet_replica, ctx, ss)); - ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss)); - ss::repair_tablet.set(r, rest_bind(rest_repair_tablet, ctx, ss)); - ss::tablet_balancing_enable.set(r, rest_bind(rest_tablet_balancing_enable, ss)); - ss::quiesce_topology.set(r, rest_bind(rest_quiesce_topology, ss)); - sp::get_schema_versions.set(r, rest_bind(rest_get_schema_versions, ss)); - ss::drop_quarantined_sstables.set(r, rest_bind(rest_drop_quarantined_sstables, ctx, ss)); + ss::get_token_endpoint.set(r, gated(ss, rest_bind(rest_get_token_endpoint, ctx, ss))); + ss::toppartitions_generic.set(r, gated(ss, rest_bind(rest_toppartitions_generic, ctx))); + ss::get_release_version.set(r, gated(ss, rest_bind(rest_get_release_version, ss))); + ss::get_scylla_release_version.set(r, gated(ss, rest_bind(rest_get_scylla_release_version, ss))); + ss::get_schema_version.set(r, gated(ss, rest_bind(rest_get_schema_version, ss))); + ss::get_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_range_to_endpoint_map, ctx, ss))); + ss::get_pending_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_pending_range_to_endpoint_map, ctx))); + ss::describe_ring.set(r, gated(ss, rest_bind(rest_describe_ring, ctx, ss))); + ss::get_current_generation_number.set(r, gated(ss, rest_bind(rest_get_current_generation_number, ss))); + ss::get_natural_endpoints.set(r, gated(ss, rest_bind(rest_get_natural_endpoints, ctx, ss))); + ss::cdc_streams_check_and_repair.set(r, gated(ss, rest_bind(rest_cdc_streams_check_and_repair, ss))); + ss::cleanup_all.set(r, gated(ss, rest_bind(rest_cleanup_all, ctx, ss))); + ss::reset_cleanup_needed.set(r, gated(ss, rest_bind(rest_reset_cleanup_needed, ctx, ss))); + ss::force_flush.set(r, gated(ss, rest_bind(rest_force_flush, ctx))); + ss::force_keyspace_flush.set(r, gated(ss, rest_bind(rest_force_keyspace_flush, ctx))); + ss::decommission.set(r, gated(ss, rest_bind(rest_decommission, ss))); + ss::move.set(r, gated(ss, rest_bind(rest_move, ss))); + ss::remove_node.set(r, gated(ss, rest_bind(rest_remove_node, ss))); + ss::get_removal_status.set(r, gated(ss, rest_bind(rest_get_removal_status, ss))); + ss::force_remove_completion.set(r, gated(ss, rest_bind(rest_force_remove_completion, ss))); + ss::set_logging_level.set(r, gated(ss, rest_bind(rest_set_logging_level))); + ss::get_logging_levels.set(r, gated(ss, rest_bind(rest_get_logging_levels))); + ss::get_operation_mode.set(r, gated(ss, rest_bind(rest_get_operation_mode, ss))); + ss::is_starting.set(r, gated(ss, rest_bind(rest_is_starting, ss))); + ss::get_drain_progress.set(r, gated(ss, rest_bind(rest_get_drain_progress, ss))); + ss::drain.set(r, gated(ss, rest_bind(rest_drain, ss))); + ss::stop_gossiping.set(r, gated(ss, rest_bind(rest_stop_gossiping, ss))); + ss::start_gossiping.set(r, gated(ss, rest_bind(rest_start_gossiping, ss))); + ss::is_gossip_running.set(r, gated(ss, rest_bind(rest_is_gossip_running, ss))); + ss::stop_daemon.set(r, gated(ss, rest_bind(rest_stop_daemon))); + ss::is_initialized.set(r, gated(ss, rest_bind(rest_is_initialized, ss))); + ss::join_ring.set(r, gated(ss, rest_bind(rest_join_ring))); + ss::is_joined.set(r, gated(ss, rest_bind(rest_is_joined, ss))); + ss::is_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_is_incremental_backups_enabled, ctx))); + ss::set_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_set_incremental_backups_enabled, ctx))); + ss::rebuild.set(r, gated(ss, rest_bind(rest_rebuild, ss))); + ss::bulk_load.set(r, gated(ss, rest_bind(rest_bulk_load))); + ss::bulk_load_async.set(r, gated(ss, rest_bind(rest_bulk_load_async))); + ss::reschedule_failed_deletions.set(r, gated(ss, rest_bind(rest_reschedule_failed_deletions))); + ss::sample_key_range.set(r, gated(ss, rest_bind(rest_sample_key_range))); + ss::reset_local_schema.set(r, gated(ss, rest_bind(rest_reset_local_schema, ss))); + ss::set_trace_probability.set(r, gated(ss, rest_bind(rest_set_trace_probability))); + ss::get_trace_probability.set(r, gated(ss, rest_bind(rest_get_trace_probability))); + ss::get_slow_query_info.set(r, gated(ss, rest_bind(rest_get_slow_query_info))); + ss::set_slow_query.set(r, gated(ss, rest_bind(rest_set_slow_query))); + ss::deliver_hints.set(r, gated(ss, rest_bind(rest_deliver_hints))); + ss::get_cluster_name.set(r, gated(ss, rest_bind(rest_get_cluster_name, ss))); + ss::get_partitioner_name.set(r, gated(ss, rest_bind(rest_get_partitioner_name, ss))); + ss::get_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_warn_threshold))); + ss::set_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_warn_threshold))); + ss::get_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_failure_threshold))); + ss::set_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_failure_threshold))); + ss::get_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_get_batch_size_failure_threshold))); + ss::set_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_set_batch_size_failure_threshold))); + ss::set_hinted_handoff_throttle_in_kb.set(r, gated(ss, rest_bind(rest_set_hinted_handoff_throttle_in_kb))); + ss::get_exceptions.set(r, gated(ss, rest_bind(rest_get_exceptions, ss))); + ss::get_total_hints_in_progress.set(r, gated(ss, rest_bind(rest_get_total_hints_in_progress))); + ss::get_total_hints.set(r, gated(ss, rest_bind(rest_get_total_hints))); + ss::get_ownership.set(r, gated(ss, rest_bind(rest_get_ownership, ctx, ss))); + ss::get_effective_ownership.set(r, gated(ss, rest_bind(rest_get_effective_ownership, ctx, ss))); + ss::retrain_dict.set(r, gated(ss, rest_bind(rest_retrain_dict, ctx, ss, group0_client))); + ss::estimate_compression_ratios.set(r, gated(ss, rest_bind(rest_estimate_compression_ratios, ctx, ss))); + ss::sstable_info.set(r, gated(ss, rest_bind(rest_sstable_info, ctx))); + ss::reload_raft_topology_state.set(r, gated(ss, rest_bind(rest_reload_raft_topology_state, ss, group0_client))); + ss::upgrade_to_raft_topology.set(r, gated(ss, rest_bind(rest_upgrade_to_raft_topology, ss))); + ss::raft_topology_upgrade_status.set(r, gated(ss, rest_bind(rest_raft_topology_upgrade_status, ss))); + ss::raft_topology_get_cmd_status.set(r, gated(ss, rest_bind(rest_raft_topology_get_cmd_status, ss))); + ss::move_tablet.set(r, gated(ss, rest_bind(rest_move_tablet, ctx, ss))); + ss::add_tablet_replica.set(r, gated(ss, rest_bind(rest_add_tablet_replica, ctx, ss))); + ss::del_tablet_replica.set(r, gated(ss, rest_bind(rest_del_tablet_replica, ctx, ss))); + ss::repair_tablet.set(r, gated(ss, rest_bind(rest_repair_tablet, ctx, ss))); + ss::tablet_balancing_enable.set(r, gated(ss, rest_bind(rest_tablet_balancing_enable, ss))); + ss::quiesce_topology.set(r, gated(ss, rest_bind(rest_quiesce_topology, ss))); + sp::get_schema_versions.set(r, gated(ss, rest_bind(rest_get_schema_versions, ss))); + ss::drop_quarantined_sstables.set(r, gated(ss, rest_bind(rest_drop_quarantined_sstables, ctx, ss))); } void unset_storage_service(http_context& ctx, routes& r) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 62340829b6..0924592e77 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3638,7 +3638,7 @@ storage_service::prepare_replacement_info(std::unordered_set } future> storage_service::get_ownership() { - return run_with_no_api_lock([this] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { const auto& tm = ss.get_token_metadata(); auto token_map = dht::token::describe_ownership(tm.sorted_tokens()); // describeOwnership returns tokens in an unspecified order, let's re-order them @@ -3646,7 +3646,7 @@ future> storage_service::get_ownership() { for (auto entry : token_map) { locator::host_id id = tm.get_endpoint(entry.first).value(); auto token_ownership = entry.second; - ownership[_address_map.get(id)] += token_ownership; + ownership[ss._address_map.get(id)] += token_ownership; } return ownership; }); diff --git a/service/storage_service.hh b/service/storage_service.hh index 193c7c9db5..8322a2a213 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -781,13 +781,19 @@ private: */ future<> stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace); + // REST handlers are gated at the registration site (see gated() in + // api/storage_service.cc) so stop() drains in-flight requests before + // teardown. run_with_api_lock_internal and run_with_no_api_lock hold + // _async_gate on shard 0 as well, because REST requests arriving on + // any shard are forwarded there for execution. template auto run_with_api_lock_internal(storage_service& ss, Func&& func, sstring& operation) { + auto holder = ss._async_gate.hold(); if (!ss._operation_in_progress.empty()) { throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress)); } ss._operation_in_progress = std::move(operation); - return func(ss).finally([&ss] { + return func(ss).finally([&ss, holder = std::move(holder)] { ss._operation_in_progress = sstring(); }); } @@ -795,6 +801,10 @@ private: public: int32_t get_exception_count(); + auto hold_async_gate() { + return _async_gate.hold(); + } + template auto run_with_api_lock(sstring operation, Func&& func) { return container().invoke_on(0, [operation = std::move(operation), @@ -805,8 +815,10 @@ public: template auto run_with_no_api_lock(Func&& func) { - return container().invoke_on(0, [func = std::forward(func)] (storage_service& ss) mutable { - return func(ss); + return container().invoke_on(0, [func = std::forward(func)] (storage_service& ss) mutable + -> futurize_t> { + auto holder = ss._async_gate.hold(); + co_return co_await futurize_invoke(func, ss); }); }