diff --git a/api/storage_service.cc b/api/storage_service.cc index e14e06996e..5488657397 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -856,7 +856,9 @@ rest_exclude_node(sharded& ss, std::unique_ptrget_query_param("intended_mode"); auto mode = service::intended_storage_mode_from_string(mode_str); - co_await ss.local().set_node_intended_storage_mode(mode); + co_await ss.local().run_with_no_api_lock([mode] (service::storage_service& ss) { + return ss.set_node_intended_storage_mode(mode); + }); co_return json_void(); } @@ -1782,7 +1790,9 @@ rest_finalize_vnode_tablet_migration(http_context& ctx, sharded& ss, seastar::httpd::future_json_function fn) { + return [fn = std::move(fn), &ss](std::unique_ptr req) -> future { + auto holder = ss.local().hold_async_gate(); + co_return co_await fn(std::move(req)); + }; +} + +static seastar::httpd::json_request_function +gated(sharded&, seastar::httpd::json_request_function fn) { + return fn; +} + void set_storage_service(http_context& ctx, routes& r, sharded& ss, sharded& ssc, service::raft_group0_client& group0_client) { - ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss)); - ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss)); - ss::get_scylla_release_version.set(r, rest_bind(rest_get_scylla_release_version, ss)); - ss::get_schema_version.set(r, rest_bind(rest_get_schema_version, ss)); - ss::get_range_to_endpoint_map.set(r, rest_bind(rest_get_range_to_endpoint_map, ctx, ss)); - ss::get_pending_range_to_endpoint_map.set(r, rest_bind(rest_get_pending_range_to_endpoint_map, ctx)); - ss::describe_ring.set(r, rest_bind(rest_describe_ring, ctx, ss)); - ss::get_current_generation_number.set(r, rest_bind(rest_get_current_generation_number, ss)); - ss::get_natural_endpoints.set(r, rest_bind(rest_get_natural_endpoints, ctx, ss)); - ss::get_natural_endpoints_v2.set(r, rest_bind(rest_get_natural_endpoints_v2, ctx, ss)); - ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss)); - ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss)); - ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss)); - ss::force_flush.set(r, rest_bind(rest_force_flush, ctx)); - ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx)); - ss::decommission.set(r, rest_bind(rest_decommission, ss, ssc)); - ss::logstor_compaction.set(r, rest_bind(rest_logstor_compaction, ctx)); - ss::logstor_flush.set(r, rest_bind(rest_logstor_flush, ctx)); - ss::move.set(r, rest_bind(rest_move, ss)); - ss::remove_node.set(r, rest_bind(rest_remove_node, ss)); - ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss)); - ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss)); - ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss)); - ss::set_logging_level.set(r, rest_bind(rest_set_logging_level)); - ss::get_logging_levels.set(r, rest_bind(rest_get_logging_levels)); - ss::get_operation_mode.set(r, rest_bind(rest_get_operation_mode, ss)); - ss::is_starting.set(r, rest_bind(rest_is_starting, ss)); - ss::get_drain_progress.set(r, rest_bind(rest_get_drain_progress, ss)); - ss::drain.set(r, rest_bind(rest_drain, ss)); - ss::stop_gossiping.set(r, rest_bind(rest_stop_gossiping, ss)); - ss::start_gossiping.set(r, rest_bind(rest_start_gossiping, ss)); - ss::is_gossip_running.set(r, rest_bind(rest_is_gossip_running, ss)); - ss::stop_daemon.set(r, rest_bind(rest_stop_daemon)); - ss::is_initialized.set(r, rest_bind(rest_is_initialized, ss)); - ss::join_ring.set(r, rest_bind(rest_join_ring)); - ss::is_joined.set(r, rest_bind(rest_is_joined, ss)); - ss::is_incremental_backups_enabled.set(r, rest_bind(rest_is_incremental_backups_enabled, ctx)); - ss::set_incremental_backups_enabled.set(r, rest_bind(rest_set_incremental_backups_enabled, ctx)); - ss::rebuild.set(r, rest_bind(rest_rebuild, ss)); - ss::bulk_load.set(r, rest_bind(rest_bulk_load)); - ss::bulk_load_async.set(r, rest_bind(rest_bulk_load_async)); - ss::reschedule_failed_deletions.set(r, rest_bind(rest_reschedule_failed_deletions)); - ss::sample_key_range.set(r, rest_bind(rest_sample_key_range)); - ss::reset_local_schema.set(r, rest_bind(rest_reset_local_schema, ss)); - ss::set_trace_probability.set(r, rest_bind(rest_set_trace_probability)); - ss::get_trace_probability.set(r, rest_bind(rest_get_trace_probability)); - ss::get_slow_query_info.set(r, rest_bind(rest_get_slow_query_info)); - ss::set_slow_query.set(r, rest_bind(rest_set_slow_query)); - ss::deliver_hints.set(r, rest_bind(rest_deliver_hints)); - ss::get_cluster_name.set(r, rest_bind(rest_get_cluster_name, ss)); - ss::get_partitioner_name.set(r, rest_bind(rest_get_partitioner_name, ss)); - ss::get_tombstone_warn_threshold.set(r, rest_bind(rest_get_tombstone_warn_threshold)); - ss::set_tombstone_warn_threshold.set(r, rest_bind(rest_set_tombstone_warn_threshold)); - ss::get_tombstone_failure_threshold.set(r, rest_bind(rest_get_tombstone_failure_threshold)); - ss::set_tombstone_failure_threshold.set(r, rest_bind(rest_set_tombstone_failure_threshold)); - ss::get_batch_size_failure_threshold.set(r, rest_bind(rest_get_batch_size_failure_threshold)); - ss::set_batch_size_failure_threshold.set(r, rest_bind(rest_set_batch_size_failure_threshold)); - ss::set_hinted_handoff_throttle_in_kb.set(r, rest_bind(rest_set_hinted_handoff_throttle_in_kb)); - ss::get_exceptions.set(r, rest_bind(rest_get_exceptions, ss)); - ss::get_total_hints_in_progress.set(r, rest_bind(rest_get_total_hints_in_progress)); - ss::get_total_hints.set(r, rest_bind(rest_get_total_hints)); - ss::get_ownership.set(r, rest_bind(rest_get_ownership, ctx, ss)); - ss::get_effective_ownership.set(r, rest_bind(rest_get_effective_ownership, ctx, ss)); - ss::retrain_dict.set(r, rest_bind(rest_retrain_dict, ctx, ss, group0_client)); - ss::estimate_compression_ratios.set(r, rest_bind(rest_estimate_compression_ratios, ctx, ss)); - ss::sstable_info.set(r, rest_bind(rest_sstable_info, ctx)); - ss::logstor_info.set(r, rest_bind(rest_logstor_info, ctx)); - ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client)); - ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss)); - ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss)); - ss::raft_topology_get_cmd_status.set(r, rest_bind(rest_raft_topology_get_cmd_status, ss)); - ss::move_tablet.set(r, rest_bind(rest_move_tablet, ctx, ss)); - ss::add_tablet_replica.set(r, rest_bind(rest_add_tablet_replica, ctx, ss)); - ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss)); - ss::repair_tablet.set(r, rest_bind(rest_repair_tablet, ctx, ss)); - ss::tablet_balancing_enable.set(r, rest_bind(rest_tablet_balancing_enable, ss)); - ss::create_vnode_tablet_migration.set(r, rest_bind(rest_create_vnode_tablet_migration, ctx, ss)); - ss::get_vnode_tablet_migration.set(r, rest_bind(rest_get_vnode_tablet_migration, ctx, ss)); - ss::set_vnode_tablet_migration_node_storage_mode.set(r, rest_bind(rest_set_vnode_tablet_migration_node_storage_mode, ctx, ss)); - ss::finalize_vnode_tablet_migration.set(r, rest_bind(rest_finalize_vnode_tablet_migration, ctx, ss)); - ss::quiesce_topology.set(r, rest_bind(rest_quiesce_topology, ss)); - sp::get_schema_versions.set(r, rest_bind(rest_get_schema_versions, ss)); - ss::drop_quarantined_sstables.set(r, rest_bind(rest_drop_quarantined_sstables, ctx, ss)); + ss::get_token_endpoint.set(r, gated(ss, rest_bind(rest_get_token_endpoint, ctx, ss))); + ss::get_release_version.set(r, gated(ss, rest_bind(rest_get_release_version, ss))); + ss::get_scylla_release_version.set(r, gated(ss, rest_bind(rest_get_scylla_release_version, ss))); + ss::get_schema_version.set(r, gated(ss, rest_bind(rest_get_schema_version, ss))); + ss::get_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_range_to_endpoint_map, ctx, ss))); + ss::get_pending_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_pending_range_to_endpoint_map, ctx))); + ss::describe_ring.set(r, gated(ss, rest_bind(rest_describe_ring, ctx, ss))); + ss::get_current_generation_number.set(r, gated(ss, rest_bind(rest_get_current_generation_number, ss))); + ss::get_natural_endpoints.set(r, gated(ss, rest_bind(rest_get_natural_endpoints, ctx, ss))); + ss::get_natural_endpoints_v2.set(r, gated(ss, rest_bind(rest_get_natural_endpoints_v2, ctx, ss))); + ss::cdc_streams_check_and_repair.set(r, gated(ss, rest_bind(rest_cdc_streams_check_and_repair, ss))); + ss::cleanup_all.set(r, gated(ss, rest_bind(rest_cleanup_all, ctx, ss))); + ss::reset_cleanup_needed.set(r, gated(ss, rest_bind(rest_reset_cleanup_needed, ctx, ss))); + ss::force_flush.set(r, gated(ss, rest_bind(rest_force_flush, ctx))); + ss::force_keyspace_flush.set(r, gated(ss, rest_bind(rest_force_keyspace_flush, ctx))); + ss::decommission.set(r, gated(ss, rest_bind(rest_decommission, ss, ssc))); + ss::logstor_compaction.set(r, gated(ss, rest_bind(rest_logstor_compaction, ctx))); + ss::logstor_flush.set(r, gated(ss, rest_bind(rest_logstor_flush, ctx))); + ss::move.set(r, gated(ss, rest_bind(rest_move, ss))); + ss::remove_node.set(r, gated(ss, rest_bind(rest_remove_node, ss))); + ss::exclude_node.set(r, gated(ss, rest_bind(rest_exclude_node, ss))); + ss::get_removal_status.set(r, gated(ss, rest_bind(rest_get_removal_status, ss))); + ss::force_remove_completion.set(r, gated(ss, rest_bind(rest_force_remove_completion, ss))); + ss::set_logging_level.set(r, gated(ss, rest_bind(rest_set_logging_level))); + ss::get_logging_levels.set(r, gated(ss, rest_bind(rest_get_logging_levels))); + ss::get_operation_mode.set(r, gated(ss, rest_bind(rest_get_operation_mode, ss))); + ss::is_starting.set(r, gated(ss, rest_bind(rest_is_starting, ss))); + ss::get_drain_progress.set(r, gated(ss, rest_bind(rest_get_drain_progress, ss))); + ss::drain.set(r, gated(ss, rest_bind(rest_drain, ss))); + ss::stop_gossiping.set(r, gated(ss, rest_bind(rest_stop_gossiping, ss))); + ss::start_gossiping.set(r, gated(ss, rest_bind(rest_start_gossiping, ss))); + ss::is_gossip_running.set(r, gated(ss, rest_bind(rest_is_gossip_running, ss))); + ss::stop_daemon.set(r, gated(ss, rest_bind(rest_stop_daemon))); + ss::is_initialized.set(r, gated(ss, rest_bind(rest_is_initialized, ss))); + ss::join_ring.set(r, gated(ss, rest_bind(rest_join_ring))); + ss::is_joined.set(r, gated(ss, rest_bind(rest_is_joined, ss))); + ss::is_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_is_incremental_backups_enabled, ctx))); + ss::set_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_set_incremental_backups_enabled, ctx))); + ss::rebuild.set(r, gated(ss, rest_bind(rest_rebuild, ss))); + ss::bulk_load.set(r, gated(ss, rest_bind(rest_bulk_load))); + ss::bulk_load_async.set(r, gated(ss, rest_bind(rest_bulk_load_async))); + ss::reschedule_failed_deletions.set(r, gated(ss, rest_bind(rest_reschedule_failed_deletions))); + ss::sample_key_range.set(r, gated(ss, rest_bind(rest_sample_key_range))); + ss::reset_local_schema.set(r, gated(ss, rest_bind(rest_reset_local_schema, ss))); + ss::set_trace_probability.set(r, gated(ss, rest_bind(rest_set_trace_probability))); + ss::get_trace_probability.set(r, gated(ss, rest_bind(rest_get_trace_probability))); + ss::get_slow_query_info.set(r, gated(ss, rest_bind(rest_get_slow_query_info))); + ss::set_slow_query.set(r, gated(ss, rest_bind(rest_set_slow_query))); + ss::deliver_hints.set(r, gated(ss, rest_bind(rest_deliver_hints))); + ss::get_cluster_name.set(r, gated(ss, rest_bind(rest_get_cluster_name, ss))); + ss::get_partitioner_name.set(r, gated(ss, rest_bind(rest_get_partitioner_name, ss))); + ss::get_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_warn_threshold))); + ss::set_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_warn_threshold))); + ss::get_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_failure_threshold))); + ss::set_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_failure_threshold))); + ss::get_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_get_batch_size_failure_threshold))); + ss::set_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_set_batch_size_failure_threshold))); + ss::set_hinted_handoff_throttle_in_kb.set(r, gated(ss, rest_bind(rest_set_hinted_handoff_throttle_in_kb))); + ss::get_exceptions.set(r, gated(ss, rest_bind(rest_get_exceptions, ss))); + ss::get_total_hints_in_progress.set(r, gated(ss, rest_bind(rest_get_total_hints_in_progress))); + ss::get_total_hints.set(r, gated(ss, rest_bind(rest_get_total_hints))); + ss::get_ownership.set(r, gated(ss, rest_bind(rest_get_ownership, ctx, ss))); + ss::get_effective_ownership.set(r, gated(ss, rest_bind(rest_get_effective_ownership, ctx, ss))); + ss::retrain_dict.set(r, gated(ss, rest_bind(rest_retrain_dict, ctx, ss, group0_client))); + ss::estimate_compression_ratios.set(r, gated(ss, rest_bind(rest_estimate_compression_ratios, ctx, ss))); + ss::sstable_info.set(r, gated(ss, rest_bind(rest_sstable_info, ctx))); + ss::logstor_info.set(r, gated(ss, rest_bind(rest_logstor_info, ctx))); + ss::reload_raft_topology_state.set(r, gated(ss, rest_bind(rest_reload_raft_topology_state, ss, group0_client))); + ss::upgrade_to_raft_topology.set(r, gated(ss, rest_bind(rest_upgrade_to_raft_topology, ss))); + ss::raft_topology_upgrade_status.set(r, gated(ss, rest_bind(rest_raft_topology_upgrade_status, ss))); + ss::raft_topology_get_cmd_status.set(r, gated(ss, rest_bind(rest_raft_topology_get_cmd_status, ss))); + ss::move_tablet.set(r, gated(ss, rest_bind(rest_move_tablet, ctx, ss))); + ss::add_tablet_replica.set(r, gated(ss, rest_bind(rest_add_tablet_replica, ctx, ss))); + ss::del_tablet_replica.set(r, gated(ss, rest_bind(rest_del_tablet_replica, ctx, ss))); + ss::repair_tablet.set(r, gated(ss, rest_bind(rest_repair_tablet, ctx, ss))); + ss::tablet_balancing_enable.set(r, gated(ss, rest_bind(rest_tablet_balancing_enable, ss))); + ss::create_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_create_vnode_tablet_migration, ctx, ss))); + ss::get_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_get_vnode_tablet_migration, ctx, ss))); + ss::set_vnode_tablet_migration_node_storage_mode.set(r, gated(ss, rest_bind(rest_set_vnode_tablet_migration_node_storage_mode, ctx, ss))); + ss::finalize_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_finalize_vnode_tablet_migration, ctx, ss))); + ss::quiesce_topology.set(r, gated(ss, rest_bind(rest_quiesce_topology, ss))); + sp::get_schema_versions.set(r, gated(ss, rest_bind(rest_get_schema_versions, ss))); + ss::drop_quarantined_sstables.set(r, gated(ss, rest_bind(rest_drop_quarantined_sstables, ctx, ss))); } void unset_storage_service(http_context& ctx, routes& r) { diff --git a/service/storage_service.cc b/service/storage_service.cc index a023a8189d..6b264b8e04 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2431,7 +2431,7 @@ storage_service::prepare_replacement_info(std::unordered_set } future> storage_service::get_ownership() { - return run_with_no_api_lock([this] (storage_service& ss) { + return run_with_no_api_lock([] (storage_service& ss) { const auto& tm = ss.get_token_metadata(); auto token_map = dht::token::describe_ownership(tm.sorted_tokens()); // describeOwnership returns tokens in an unspecified order, let's re-order them @@ -2439,7 +2439,7 @@ future> storage_service::get_ownership() { for (auto entry : token_map) { locator::host_id id = tm.get_endpoint(entry.first).value(); auto token_ownership = entry.second; - ownership[_address_map.get(id)] += token_ownership; + ownership[ss._address_map.get(id)] += token_ownership; } return ownership; }); @@ -2848,12 +2848,8 @@ future<> storage_service::raft_removenode(locator::host_id host_id, locator::hos } future<> storage_service::mark_excluded(const std::vector& hosts) { - if (this_shard_id() != 0) { - // group0 is only set on shard 0. - co_return co_await container().invoke_on(0, [&] (auto& ss) { - return ss.mark_excluded(hosts); - }); - } + // Callers forward to shard 0 via run_with_no_api_lock (group0 is only set on shard 0). + SCYLLA_ASSERT(this_shard_id() == 0); while (true) { auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{}); @@ -3098,8 +3094,8 @@ future storage_service::wait_for_topology_request_completion(utils::UUI } future<> storage_service::abort_topology_request(utils::UUID request_id) { - co_await container().invoke_on(0, [request_id, this] (storage_service& ss) { - return _topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id); + co_await container().invoke_on(0, [request_id] (storage_service& ss) { + return ss._topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id); }); } @@ -3961,11 +3957,8 @@ future<> storage_service::update_tablet_metadata(const locator::tablet_metadata_ } future<> storage_service::prepare_for_tablets_migration(const sstring& ks_name) { - if (this_shard_id() != 0) { - co_return co_await container().invoke_on(0, [&] (auto& ss) { - return ss.prepare_for_tablets_migration(ks_name); - }); - } + // Called via run_with_no_api_lock (forwards to shard 0). + SCYLLA_ASSERT(this_shard_id() == 0); while (true) { auto guard = co_await _group0->client().start_operation(_group0_as); @@ -4105,11 +4098,8 @@ future<> storage_service::prepare_for_tablets_migration(const sstring& ks_name) } future<> storage_service::set_node_intended_storage_mode(intended_storage_mode mode) { - if (this_shard_id() != 0) { - co_return co_await container().invoke_on(0, [mode] (auto& ss) { - return ss.set_node_intended_storage_mode(mode); - }); - } + // Called via run_with_no_api_lock (forwards to shard 0). + SCYLLA_ASSERT(this_shard_id() == 0); auto& raft_server = _group0->group0_server(); auto holder = _group0->hold_group0_gate(); @@ -4205,11 +4195,8 @@ storage_service::migration_status storage_service::get_tablets_migration_status( } future storage_service::get_tablets_migration_status_with_node_details(const sstring& ks_name) { - if (this_shard_id() != 0) { - co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) { - return ss.get_tablets_migration_status_with_node_details(ks_name); - }); - } + // Called via run_with_no_api_lock (forwards to shard 0). + SCYLLA_ASSERT(this_shard_id() == 0); keyspace_migration_status result; result.keyspace = ks_name; @@ -4270,11 +4257,8 @@ future storage_service::get_tablets_ } future<> storage_service::finalize_tablets_migration(const sstring& ks_name) { - if (this_shard_id() != 0) { - co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) { - return ss.finalize_tablets_migration(ks_name); - }); - } + // Called via run_with_no_api_lock (forwards to shard 0). + SCYLLA_ASSERT(this_shard_id() == 0); slogger.info("Finalizing vnodes-to-tablets migration for keyspace '{}'", ks_name); diff --git a/service/storage_service.hh b/service/storage_service.hh index abe38e0ff2..13f9adf18b 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -780,13 +780,19 @@ private: */ future<> stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace); + // REST handlers are gated at the registration site (see gated() in + // api/storage_service.cc) so stop() drains in-flight requests before + // teardown. run_with_api_lock_internal and run_with_no_api_lock hold + // _async_gate on shard 0 as well, because REST requests arriving on + // any shard are forwarded there for execution. template auto run_with_api_lock_internal(storage_service& ss, Func&& func, sstring& operation) { + auto holder = ss._async_gate.hold(); if (!ss._operation_in_progress.empty()) { throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress)); } ss._operation_in_progress = std::move(operation); - return func(ss).finally([&ss] { + return func(ss).finally([&ss, holder = std::move(holder)] { ss._operation_in_progress = sstring(); }); } @@ -794,6 +800,10 @@ private: public: int32_t get_exception_count(); + auto hold_async_gate() { + return _async_gate.hold(); + } + template auto run_with_api_lock(sstring operation, Func&& func) { return container().invoke_on(0, [operation = std::move(operation), @@ -804,8 +814,10 @@ public: template auto run_with_no_api_lock(Func&& func) { - return container().invoke_on(0, [func = std::forward(func)] (storage_service& ss) mutable { - return func(ss); + return container().invoke_on(0, [func = std::forward(func)] (storage_service& ss) mutable + -> futurize_t> { + auto holder = ss._async_gate.hold(); + co_return co_await futurize_invoke(func, ss); }); }