Merge 'storage_service: fix REST API races during shutdown and cross-shard forwarding' from Piotr Smaron

REST route removal unregisters handlers but does not wait for requests
that already entered storage_service.  A request can therefore suspend
inside an async operation, restart proceeds to tear the service down,
and the coroutine later resumes against destroyed members such as
_topology_state_machine, _group0, or _sys_ks — a use-after-destruction
bug that surfaces as UBSAN dynamic-type failures (e.g. the crash seen
from topology_state_load()).

Fix this by holding storage_service::_async_gate from the entry
boundary of every externally-triggered async operation so that stop()
drains them before teardown begins.  The gate is acquired in
run_with_api_lock, run_with_no_api_lock, and in individual REST
handlers that bypass those wrappers (reload_raft_topology_state,
mark_excluded, removenode, schema reload, topology-request
waits/abort, cleanup, ring/schema queries, SSTable dictionary
training/publish, and sampling).

Additionally, fix get_ownership() and abort_topology_request() which
forward work to shard 0 but were still referencing the caller-shard's
`this` pointer instead of the destination-shard instance, causing
silent cross-shard access to shard-local state.
Add a cluster regression test that repeatedly exercises the multi-shard
ownership REST path to cover the forwarding fix.

Fixes: SCYLLADB-1415

Should be backported to all branches, the code has been introduced around 2024.1 release.

Closes scylladb/scylladb#29373

* github.com:scylladb/scylladb:
  storage_service: fix shard-0 forwarding in REST helpers
  storage_service: gate REST-facing async operations during shutdown
  storage_service: prepare for async gate in REST handlers
This commit is contained in:
Marcin Maliszkiewicz
2026-04-22 14:43:31 +02:00
3 changed files with 143 additions and 121 deletions

View File

@@ -856,7 +856,9 @@ rest_exclude_node(sharded<service::storage_service>& ss, std::unique_ptr<http::r
}
apilog.info("exclude_node: hosts={}", hosts);
co_await ss.local().mark_excluded(hosts);
co_await ss.local().run_with_no_api_lock([hosts = std::move(hosts)] (service::storage_service& ss) {
return ss.mark_excluded(hosts);
});
co_return json_void();
}
@@ -1731,7 +1733,9 @@ rest_create_vnode_tablet_migration(http_context& ctx, sharded<service::storage_s
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
}
auto keyspace = validate_keyspace(ctx, req);
co_await ss.local().prepare_for_tablets_migration(keyspace);
co_await ss.local().run_with_no_api_lock([keyspace] (service::storage_service& ss) {
return ss.prepare_for_tablets_migration(keyspace);
});
co_return json_void();
}
@@ -1743,7 +1747,9 @@ rest_get_vnode_tablet_migration(http_context& ctx, sharded<service::storage_serv
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
}
auto keyspace = validate_keyspace(ctx, req);
auto status = co_await ss.local().get_tablets_migration_status_with_node_details(keyspace);
auto status = co_await ss.local().run_with_no_api_lock([keyspace] (service::storage_service& ss) {
return ss.get_tablets_migration_status_with_node_details(keyspace);
});
ss::vnode_tablet_migration_status result;
result.keyspace = status.keyspace;
@@ -1768,7 +1774,9 @@ rest_set_vnode_tablet_migration_node_storage_mode(http_context& ctx, sharded<ser
}
auto mode_str = req->get_query_param("intended_mode");
auto mode = service::intended_storage_mode_from_string(mode_str);
co_await ss.local().set_node_intended_storage_mode(mode);
co_await ss.local().run_with_no_api_lock([mode] (service::storage_service& ss) {
return ss.set_node_intended_storage_mode(mode);
});
co_return json_void();
}
@@ -1782,7 +1790,9 @@ rest_finalize_vnode_tablet_migration(http_context& ctx, sharded<service::storage
auto keyspace = validate_keyspace(ctx, req);
validate_keyspace(ctx, keyspace);
co_await ss.local().finalize_tablets_migration(keyspace);
co_await ss.local().run_with_no_api_lock([keyspace] (service::storage_service& ss) {
return ss.finalize_tablets_migration(keyspace);
});
co_return json_void();
}
@@ -1859,90 +1869,106 @@ rest_bind(FuncType func, BindArgs&... args) {
return std::bind_front(func, std::ref(args)...);
}
// Hold the storage_service async gate for the duration of async REST
// handlers so stop() drains in-flight requests before teardown.
// Synchronous handlers don't yield and need no gate.
static seastar::httpd::future_json_function
gated(sharded<service::storage_service>& ss, seastar::httpd::future_json_function fn) {
return [fn = std::move(fn), &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto holder = ss.local().hold_async_gate();
co_return co_await fn(std::move(req));
};
}
static seastar::httpd::json_request_function
gated(sharded<service::storage_service>&, seastar::httpd::json_request_function fn) {
return fn;
}
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss));
ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss));
ss::get_scylla_release_version.set(r, rest_bind(rest_get_scylla_release_version, ss));
ss::get_schema_version.set(r, rest_bind(rest_get_schema_version, ss));
ss::get_range_to_endpoint_map.set(r, rest_bind(rest_get_range_to_endpoint_map, ctx, ss));
ss::get_pending_range_to_endpoint_map.set(r, rest_bind(rest_get_pending_range_to_endpoint_map, ctx));
ss::describe_ring.set(r, rest_bind(rest_describe_ring, ctx, ss));
ss::get_current_generation_number.set(r, rest_bind(rest_get_current_generation_number, ss));
ss::get_natural_endpoints.set(r, rest_bind(rest_get_natural_endpoints, ctx, ss));
ss::get_natural_endpoints_v2.set(r, rest_bind(rest_get_natural_endpoints_v2, ctx, ss));
ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss));
ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss));
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
ss::decommission.set(r, rest_bind(rest_decommission, ss, ssc));
ss::logstor_compaction.set(r, rest_bind(rest_logstor_compaction, ctx));
ss::logstor_flush.set(r, rest_bind(rest_logstor_flush, ctx));
ss::move.set(r, rest_bind(rest_move, ss));
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss));
ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss));
ss::set_logging_level.set(r, rest_bind(rest_set_logging_level));
ss::get_logging_levels.set(r, rest_bind(rest_get_logging_levels));
ss::get_operation_mode.set(r, rest_bind(rest_get_operation_mode, ss));
ss::is_starting.set(r, rest_bind(rest_is_starting, ss));
ss::get_drain_progress.set(r, rest_bind(rest_get_drain_progress, ss));
ss::drain.set(r, rest_bind(rest_drain, ss));
ss::stop_gossiping.set(r, rest_bind(rest_stop_gossiping, ss));
ss::start_gossiping.set(r, rest_bind(rest_start_gossiping, ss));
ss::is_gossip_running.set(r, rest_bind(rest_is_gossip_running, ss));
ss::stop_daemon.set(r, rest_bind(rest_stop_daemon));
ss::is_initialized.set(r, rest_bind(rest_is_initialized, ss));
ss::join_ring.set(r, rest_bind(rest_join_ring));
ss::is_joined.set(r, rest_bind(rest_is_joined, ss));
ss::is_incremental_backups_enabled.set(r, rest_bind(rest_is_incremental_backups_enabled, ctx));
ss::set_incremental_backups_enabled.set(r, rest_bind(rest_set_incremental_backups_enabled, ctx));
ss::rebuild.set(r, rest_bind(rest_rebuild, ss));
ss::bulk_load.set(r, rest_bind(rest_bulk_load));
ss::bulk_load_async.set(r, rest_bind(rest_bulk_load_async));
ss::reschedule_failed_deletions.set(r, rest_bind(rest_reschedule_failed_deletions));
ss::sample_key_range.set(r, rest_bind(rest_sample_key_range));
ss::reset_local_schema.set(r, rest_bind(rest_reset_local_schema, ss));
ss::set_trace_probability.set(r, rest_bind(rest_set_trace_probability));
ss::get_trace_probability.set(r, rest_bind(rest_get_trace_probability));
ss::get_slow_query_info.set(r, rest_bind(rest_get_slow_query_info));
ss::set_slow_query.set(r, rest_bind(rest_set_slow_query));
ss::deliver_hints.set(r, rest_bind(rest_deliver_hints));
ss::get_cluster_name.set(r, rest_bind(rest_get_cluster_name, ss));
ss::get_partitioner_name.set(r, rest_bind(rest_get_partitioner_name, ss));
ss::get_tombstone_warn_threshold.set(r, rest_bind(rest_get_tombstone_warn_threshold));
ss::set_tombstone_warn_threshold.set(r, rest_bind(rest_set_tombstone_warn_threshold));
ss::get_tombstone_failure_threshold.set(r, rest_bind(rest_get_tombstone_failure_threshold));
ss::set_tombstone_failure_threshold.set(r, rest_bind(rest_set_tombstone_failure_threshold));
ss::get_batch_size_failure_threshold.set(r, rest_bind(rest_get_batch_size_failure_threshold));
ss::set_batch_size_failure_threshold.set(r, rest_bind(rest_set_batch_size_failure_threshold));
ss::set_hinted_handoff_throttle_in_kb.set(r, rest_bind(rest_set_hinted_handoff_throttle_in_kb));
ss::get_exceptions.set(r, rest_bind(rest_get_exceptions, ss));
ss::get_total_hints_in_progress.set(r, rest_bind(rest_get_total_hints_in_progress));
ss::get_total_hints.set(r, rest_bind(rest_get_total_hints));
ss::get_ownership.set(r, rest_bind(rest_get_ownership, ctx, ss));
ss::get_effective_ownership.set(r, rest_bind(rest_get_effective_ownership, ctx, ss));
ss::retrain_dict.set(r, rest_bind(rest_retrain_dict, ctx, ss, group0_client));
ss::estimate_compression_ratios.set(r, rest_bind(rest_estimate_compression_ratios, ctx, ss));
ss::sstable_info.set(r, rest_bind(rest_sstable_info, ctx));
ss::logstor_info.set(r, rest_bind(rest_logstor_info, ctx));
ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client));
ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss));
ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss));
ss::raft_topology_get_cmd_status.set(r, rest_bind(rest_raft_topology_get_cmd_status, ss));
ss::move_tablet.set(r, rest_bind(rest_move_tablet, ctx, ss));
ss::add_tablet_replica.set(r, rest_bind(rest_add_tablet_replica, ctx, ss));
ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss));
ss::repair_tablet.set(r, rest_bind(rest_repair_tablet, ctx, ss));
ss::tablet_balancing_enable.set(r, rest_bind(rest_tablet_balancing_enable, ss));
ss::create_vnode_tablet_migration.set(r, rest_bind(rest_create_vnode_tablet_migration, ctx, ss));
ss::get_vnode_tablet_migration.set(r, rest_bind(rest_get_vnode_tablet_migration, ctx, ss));
ss::set_vnode_tablet_migration_node_storage_mode.set(r, rest_bind(rest_set_vnode_tablet_migration_node_storage_mode, ctx, ss));
ss::finalize_vnode_tablet_migration.set(r, rest_bind(rest_finalize_vnode_tablet_migration, ctx, ss));
ss::quiesce_topology.set(r, rest_bind(rest_quiesce_topology, ss));
sp::get_schema_versions.set(r, rest_bind(rest_get_schema_versions, ss));
ss::drop_quarantined_sstables.set(r, rest_bind(rest_drop_quarantined_sstables, ctx, ss));
ss::get_token_endpoint.set(r, gated(ss, rest_bind(rest_get_token_endpoint, ctx, ss)));
ss::get_release_version.set(r, gated(ss, rest_bind(rest_get_release_version, ss)));
ss::get_scylla_release_version.set(r, gated(ss, rest_bind(rest_get_scylla_release_version, ss)));
ss::get_schema_version.set(r, gated(ss, rest_bind(rest_get_schema_version, ss)));
ss::get_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_range_to_endpoint_map, ctx, ss)));
ss::get_pending_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_pending_range_to_endpoint_map, ctx)));
ss::describe_ring.set(r, gated(ss, rest_bind(rest_describe_ring, ctx, ss)));
ss::get_current_generation_number.set(r, gated(ss, rest_bind(rest_get_current_generation_number, ss)));
ss::get_natural_endpoints.set(r, gated(ss, rest_bind(rest_get_natural_endpoints, ctx, ss)));
ss::get_natural_endpoints_v2.set(r, gated(ss, rest_bind(rest_get_natural_endpoints_v2, ctx, ss)));
ss::cdc_streams_check_and_repair.set(r, gated(ss, rest_bind(rest_cdc_streams_check_and_repair, ss)));
ss::cleanup_all.set(r, gated(ss, rest_bind(rest_cleanup_all, ctx, ss)));
ss::reset_cleanup_needed.set(r, gated(ss, rest_bind(rest_reset_cleanup_needed, ctx, ss)));
ss::force_flush.set(r, gated(ss, rest_bind(rest_force_flush, ctx)));
ss::force_keyspace_flush.set(r, gated(ss, rest_bind(rest_force_keyspace_flush, ctx)));
ss::decommission.set(r, gated(ss, rest_bind(rest_decommission, ss, ssc)));
ss::logstor_compaction.set(r, gated(ss, rest_bind(rest_logstor_compaction, ctx)));
ss::logstor_flush.set(r, gated(ss, rest_bind(rest_logstor_flush, ctx)));
ss::move.set(r, gated(ss, rest_bind(rest_move, ss)));
ss::remove_node.set(r, gated(ss, rest_bind(rest_remove_node, ss)));
ss::exclude_node.set(r, gated(ss, rest_bind(rest_exclude_node, ss)));
ss::get_removal_status.set(r, gated(ss, rest_bind(rest_get_removal_status, ss)));
ss::force_remove_completion.set(r, gated(ss, rest_bind(rest_force_remove_completion, ss)));
ss::set_logging_level.set(r, gated(ss, rest_bind(rest_set_logging_level)));
ss::get_logging_levels.set(r, gated(ss, rest_bind(rest_get_logging_levels)));
ss::get_operation_mode.set(r, gated(ss, rest_bind(rest_get_operation_mode, ss)));
ss::is_starting.set(r, gated(ss, rest_bind(rest_is_starting, ss)));
ss::get_drain_progress.set(r, gated(ss, rest_bind(rest_get_drain_progress, ss)));
ss::drain.set(r, gated(ss, rest_bind(rest_drain, ss)));
ss::stop_gossiping.set(r, gated(ss, rest_bind(rest_stop_gossiping, ss)));
ss::start_gossiping.set(r, gated(ss, rest_bind(rest_start_gossiping, ss)));
ss::is_gossip_running.set(r, gated(ss, rest_bind(rest_is_gossip_running, ss)));
ss::stop_daemon.set(r, gated(ss, rest_bind(rest_stop_daemon)));
ss::is_initialized.set(r, gated(ss, rest_bind(rest_is_initialized, ss)));
ss::join_ring.set(r, gated(ss, rest_bind(rest_join_ring)));
ss::is_joined.set(r, gated(ss, rest_bind(rest_is_joined, ss)));
ss::is_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_is_incremental_backups_enabled, ctx)));
ss::set_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_set_incremental_backups_enabled, ctx)));
ss::rebuild.set(r, gated(ss, rest_bind(rest_rebuild, ss)));
ss::bulk_load.set(r, gated(ss, rest_bind(rest_bulk_load)));
ss::bulk_load_async.set(r, gated(ss, rest_bind(rest_bulk_load_async)));
ss::reschedule_failed_deletions.set(r, gated(ss, rest_bind(rest_reschedule_failed_deletions)));
ss::sample_key_range.set(r, gated(ss, rest_bind(rest_sample_key_range)));
ss::reset_local_schema.set(r, gated(ss, rest_bind(rest_reset_local_schema, ss)));
ss::set_trace_probability.set(r, gated(ss, rest_bind(rest_set_trace_probability)));
ss::get_trace_probability.set(r, gated(ss, rest_bind(rest_get_trace_probability)));
ss::get_slow_query_info.set(r, gated(ss, rest_bind(rest_get_slow_query_info)));
ss::set_slow_query.set(r, gated(ss, rest_bind(rest_set_slow_query)));
ss::deliver_hints.set(r, gated(ss, rest_bind(rest_deliver_hints)));
ss::get_cluster_name.set(r, gated(ss, rest_bind(rest_get_cluster_name, ss)));
ss::get_partitioner_name.set(r, gated(ss, rest_bind(rest_get_partitioner_name, ss)));
ss::get_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_warn_threshold)));
ss::set_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_warn_threshold)));
ss::get_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_failure_threshold)));
ss::set_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_failure_threshold)));
ss::get_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_get_batch_size_failure_threshold)));
ss::set_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_set_batch_size_failure_threshold)));
ss::set_hinted_handoff_throttle_in_kb.set(r, gated(ss, rest_bind(rest_set_hinted_handoff_throttle_in_kb)));
ss::get_exceptions.set(r, gated(ss, rest_bind(rest_get_exceptions, ss)));
ss::get_total_hints_in_progress.set(r, gated(ss, rest_bind(rest_get_total_hints_in_progress)));
ss::get_total_hints.set(r, gated(ss, rest_bind(rest_get_total_hints)));
ss::get_ownership.set(r, gated(ss, rest_bind(rest_get_ownership, ctx, ss)));
ss::get_effective_ownership.set(r, gated(ss, rest_bind(rest_get_effective_ownership, ctx, ss)));
ss::retrain_dict.set(r, gated(ss, rest_bind(rest_retrain_dict, ctx, ss, group0_client)));
ss::estimate_compression_ratios.set(r, gated(ss, rest_bind(rest_estimate_compression_ratios, ctx, ss)));
ss::sstable_info.set(r, gated(ss, rest_bind(rest_sstable_info, ctx)));
ss::logstor_info.set(r, gated(ss, rest_bind(rest_logstor_info, ctx)));
ss::reload_raft_topology_state.set(r, gated(ss, rest_bind(rest_reload_raft_topology_state, ss, group0_client)));
ss::upgrade_to_raft_topology.set(r, gated(ss, rest_bind(rest_upgrade_to_raft_topology, ss)));
ss::raft_topology_upgrade_status.set(r, gated(ss, rest_bind(rest_raft_topology_upgrade_status, ss)));
ss::raft_topology_get_cmd_status.set(r, gated(ss, rest_bind(rest_raft_topology_get_cmd_status, ss)));
ss::move_tablet.set(r, gated(ss, rest_bind(rest_move_tablet, ctx, ss)));
ss::add_tablet_replica.set(r, gated(ss, rest_bind(rest_add_tablet_replica, ctx, ss)));
ss::del_tablet_replica.set(r, gated(ss, rest_bind(rest_del_tablet_replica, ctx, ss)));
ss::repair_tablet.set(r, gated(ss, rest_bind(rest_repair_tablet, ctx, ss)));
ss::tablet_balancing_enable.set(r, gated(ss, rest_bind(rest_tablet_balancing_enable, ss)));
ss::create_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_create_vnode_tablet_migration, ctx, ss)));
ss::get_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_get_vnode_tablet_migration, ctx, ss)));
ss::set_vnode_tablet_migration_node_storage_mode.set(r, gated(ss, rest_bind(rest_set_vnode_tablet_migration_node_storage_mode, ctx, ss)));
ss::finalize_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_finalize_vnode_tablet_migration, ctx, ss)));
ss::quiesce_topology.set(r, gated(ss, rest_bind(rest_quiesce_topology, ss)));
sp::get_schema_versions.set(r, gated(ss, rest_bind(rest_get_schema_versions, ss)));
ss::drop_quarantined_sstables.set(r, gated(ss, rest_bind(rest_drop_quarantined_sstables, ctx, ss)));
}
void unset_storage_service(http_context& ctx, routes& r) {

View File

@@ -2431,7 +2431,7 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
}
future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
return run_with_no_api_lock([this] (storage_service& ss) {
return run_with_no_api_lock([] (storage_service& ss) {
const auto& tm = ss.get_token_metadata();
auto token_map = dht::token::describe_ownership(tm.sorted_tokens());
// describeOwnership returns tokens in an unspecified order, let's re-order them
@@ -2439,7 +2439,7 @@ future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
for (auto entry : token_map) {
locator::host_id id = tm.get_endpoint(entry.first).value();
auto token_ownership = entry.second;
ownership[_address_map.get(id)] += token_ownership;
ownership[ss._address_map.get(id)] += token_ownership;
}
return ownership;
});
@@ -2848,12 +2848,8 @@ future<> storage_service::raft_removenode(locator::host_id host_id, locator::hos
}
future<> storage_service::mark_excluded(const std::vector<locator::host_id>& hosts) {
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.mark_excluded(hosts);
});
}
// Callers forward to shard 0 via run_with_no_api_lock (group0 is only set on shard 0).
SCYLLA_ASSERT(this_shard_id() == 0);
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
@@ -3098,8 +3094,8 @@ future<sstring> storage_service::wait_for_topology_request_completion(utils::UUI
}
future<> storage_service::abort_topology_request(utils::UUID request_id) {
co_await container().invoke_on(0, [request_id, this] (storage_service& ss) {
return _topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id);
co_await container().invoke_on(0, [request_id] (storage_service& ss) {
return ss._topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id);
});
}
@@ -3961,11 +3957,8 @@ future<> storage_service::update_tablet_metadata(const locator::tablet_metadata_
}
future<> storage_service::prepare_for_tablets_migration(const sstring& ks_name) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.prepare_for_tablets_migration(ks_name);
});
}
// Called via run_with_no_api_lock (forwards to shard 0).
SCYLLA_ASSERT(this_shard_id() == 0);
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as);
@@ -4105,11 +4098,8 @@ future<> storage_service::prepare_for_tablets_migration(const sstring& ks_name)
}
future<> storage_service::set_node_intended_storage_mode(intended_storage_mode mode) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [mode] (auto& ss) {
return ss.set_node_intended_storage_mode(mode);
});
}
// Called via run_with_no_api_lock (forwards to shard 0).
SCYLLA_ASSERT(this_shard_id() == 0);
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
@@ -4205,11 +4195,8 @@ storage_service::migration_status storage_service::get_tablets_migration_status(
}
future<storage_service::keyspace_migration_status> storage_service::get_tablets_migration_status_with_node_details(const sstring& ks_name) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) {
return ss.get_tablets_migration_status_with_node_details(ks_name);
});
}
// Called via run_with_no_api_lock (forwards to shard 0).
SCYLLA_ASSERT(this_shard_id() == 0);
keyspace_migration_status result;
result.keyspace = ks_name;
@@ -4270,11 +4257,8 @@ future<storage_service::keyspace_migration_status> storage_service::get_tablets_
}
future<> storage_service::finalize_tablets_migration(const sstring& ks_name) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) {
return ss.finalize_tablets_migration(ks_name);
});
}
// Called via run_with_no_api_lock (forwards to shard 0).
SCYLLA_ASSERT(this_shard_id() == 0);
slogger.info("Finalizing vnodes-to-tablets migration for keyspace '{}'", ks_name);

View File

@@ -780,13 +780,19 @@ private:
*/
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream_by_keyspace);
// REST handlers are gated at the registration site (see gated() in
// api/storage_service.cc) so stop() drains in-flight requests before
// teardown. run_with_api_lock_internal and run_with_no_api_lock hold
// _async_gate on shard 0 as well, because REST requests arriving on
// any shard are forwarded there for execution.
template <typename Func>
auto run_with_api_lock_internal(storage_service& ss, Func&& func, sstring& operation) {
auto holder = ss._async_gate.hold();
if (!ss._operation_in_progress.empty()) {
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
}
ss._operation_in_progress = std::move(operation);
return func(ss).finally([&ss] {
return func(ss).finally([&ss, holder = std::move(holder)] {
ss._operation_in_progress = sstring();
});
}
@@ -794,6 +800,10 @@ private:
public:
int32_t get_exception_count();
auto hold_async_gate() {
return _async_gate.hold();
}
template <typename Func>
auto run_with_api_lock(sstring operation, Func&& func) {
return container().invoke_on(0, [operation = std::move(operation),
@@ -804,8 +814,10 @@ public:
template <typename Func>
auto run_with_no_api_lock(Func&& func) {
return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
return func(ss);
return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable
-> futurize_t<std::invoke_result_t<Func, storage_service&>> {
auto holder = ss._async_gate.hold();
co_return co_await futurize_invoke(func, ss);
});
}