Merge 'Sanitize storage_service API maintenance' from Pavel Emelyanov

Storage service API set/unset has two flaws.

First, unset doesn't happen, so after storage service is stopped its handlers become "local is not initialized"-assertion and use-after-free landmines.

Second, setting of storage service API carry gossiper and system keyspace references, thus duplicating the knowledge about storage service dependencies.

This PR fixes both by adding the storage service API unsetting and by making the handlers use _only_ storage service instance, not any externally provided references.

Closes scylladb/scylladb#15547

* github.com:scylladb/scylladb:
  main, api: Set/Unset storage_service API in proper place
  api/storage_service: Remove gossiper arg from API
  api/storage_service: Remove system keyspace arg from API
  api/storage_service: Get gossiper from storage service
  api/storage_service: Get token_metadata from storage service
This commit is contained in:
Botond Dénes
2023-09-27 10:00:54 +03:00
6 changed files with 131 additions and 35 deletions

View File

@@ -102,12 +102,16 @@ future<> unset_rpc_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_rpc_controller(ctx, r); });
}
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<db::system_keyspace>& sys_ks) {
return register_api(ctx, "storage_service", "The storage service API", [&ss, &g, &sys_ks] (http_context& ctx, routes& r) {
set_storage_service(ctx, r, ss, g.local(), sys_ks);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss) {
return register_api(ctx, "storage_service", "The storage service API", [&ss] (http_context& ctx, routes& r) {
set_storage_service(ctx, r, ss);
});
}
future<> unset_server_storage_service(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_service(ctx, r); });
}
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader) {
return ctx.http_server.set_routes([&ctx, &sst_loader] (routes& r) { set_sstables_loader(ctx, r, sst_loader); });
}

View File

@@ -85,7 +85,8 @@ future<> set_server_init(http_context& ctx);
future<> set_server_config(http_context& ctx, const db::config& cfg);
future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snitch);
future<> unset_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<db::system_keyspace>& sys_ks);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss);
future<> unset_server_storage_service(http_context& ctx);
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
future<> unset_server_sstables_loader(http_context& ctx);
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb);

View File

@@ -54,10 +54,6 @@ extern logging::logger apilog;
namespace api {
const locator::token_metadata& http_context::get_token_metadata() {
return *shared_token_metadata.local().get();
}
namespace ss = httpd::storage_service_json;
using namespace json;
@@ -465,21 +461,21 @@ static future<json::json_return_type> describe_ring_as_json(sharded<service::sto
co_return json::json_return_type(stream_range_as_array(co_await ss.local().describe_ring(keyspace), token_range_endpoints_to_json));
}
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<db::system_keyspace>& sys_ks) {
ss::local_hostid.set(r, [&ctx](std::unique_ptr<http::request> req) {
auto id = ctx.db.local().get_token_metadata().get_my_id();
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
ss::local_hostid.set(r, [&ss](std::unique_ptr<http::request> req) {
auto id = ss.local().get_token_metadata().get_my_id();
return make_ready_future<json::json_return_type>(id.to_sstring());
});
ss::get_tokens.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return make_ready_future<json::json_return_type>(stream_range_as_array(ctx.get_token_metadata().sorted_tokens(), [](const dht::token& i) {
ss::get_tokens.set(r, [&ss] (std::unique_ptr<http::request> req) {
return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_token_metadata().sorted_tokens(), [](const dht::token& i) {
return fmt::to_string(i);
}));
});
ss::get_node_tokens.set(r, [&ctx] (std::unique_ptr<http::request> req) {
ss::get_node_tokens.set(r, [&ss] (std::unique_ptr<http::request> req) {
gms::inet_address addr(req->param["endpoint"]);
return make_ready_future<json::json_return_type>(stream_range_as_array(ctx.get_token_metadata().get_tokens(addr), [](const dht::token& i) {
return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_token_metadata().get_tokens(addr), [](const dht::token& i) {
return fmt::to_string(i);
}));
});
@@ -547,8 +543,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
ss::get_leaving_nodes.set(r, [&ctx](const_req req) {
return container_to_vec(ctx.get_token_metadata().get_leaving_endpoints());
ss::get_leaving_nodes.set(r, [&ss](const_req req) {
return container_to_vec(ss.local().get_token_metadata().get_leaving_endpoints());
});
ss::get_moving_nodes.set(r, [](const_req req) {
@@ -556,8 +552,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return container_to_vec(addr);
});
ss::get_joining_nodes.set(r, [&ctx](const_req req) {
auto points = ctx.get_token_metadata().get_bootstrap_tokens();
ss::get_joining_nodes.set(r, [&ss](const_req req) {
auto points = ss.local().get_token_metadata().get_bootstrap_tokens();
std::unordered_set<sstring> addr;
for (auto i: points) {
addr.insert(fmt::to_string(i.second));
@@ -629,9 +625,9 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return describe_ring_as_json(ss, validate_keyspace(ctx, req->param));
});
ss::get_host_id_map.set(r, [&ctx](const_req req) {
ss::get_host_id_map.set(r, [&ss](const_req req) {
std::vector<ss::mapper> res;
return map_to_key_value(ctx.get_token_metadata().get_endpoint_to_host_id_map_for_reading(), res);
return map_to_key_value(ss.local().get_token_metadata().get_endpoint_to_host_id_map_for_reading(), res);
});
ss::get_load.set(r, [&ctx](std::unique_ptr<http::request> req) {
@@ -651,9 +647,9 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
ss::get_current_generation_number.set(r, [&g](std::unique_ptr<http::request> req) {
ss::get_current_generation_number.set(r, [&ss](std::unique_ptr<http::request> req) {
gms::inet_address ep(utils::fb_utilities::get_broadcast_address());
return g.get_current_generation_number(ep).then([](gms::generation_type res) {
return ss.local().gossiper().get_current_generation_number(ep).then([](gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});
@@ -898,11 +894,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});
ss::is_initialized.set(r, [&ss, &g](std::unique_ptr<http::request> req) {
return ss.local().get_operation_mode().then([&g] (auto mode) {
ss::is_initialized.set(r, [&ss](std::unique_ptr<http::request> req) {
return ss.local().get_operation_mode().then([&ss] (auto mode) {
bool is_initialized = mode >= service::storage_service::mode::STARTING;
if (mode == service::storage_service::mode::NORMAL) {
is_initialized = g.is_enabled();
is_initialized = ss.local().gossiper().is_enabled();
}
return make_ready_future<json::json_return_type>(is_initialized);
});
@@ -1123,12 +1119,12 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});
ss::get_cluster_name.set(r, [&g](const_req req) {
return g.get_cluster_name();
ss::get_cluster_name.set(r, [&ss](const_req req) {
return ss.local().gossiper().get_cluster_name();
});
ss::get_partitioner_name.set(r, [&g](const_req req) {
return g.get_partitioner_name();
ss::get_partitioner_name.set(r, [&ss](const_req req) {
return ss.local().gossiper().get_partitioner_name();
});
ss::get_tombstone_warn_threshold.set(r, [](std::unique_ptr<http::request> req) {
@@ -1339,6 +1335,95 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
}
void unset_storage_service(http_context& ctx, routes& r) {
ss::local_hostid.unset(r);
ss::get_tokens.unset(r);
ss::get_node_tokens.unset(r);
ss::get_commitlog.unset(r);
ss::get_token_endpoint.unset(r);
ss::toppartitions_generic.unset(r);
ss::get_leaving_nodes.unset(r);
ss::get_moving_nodes.unset(r);
ss::get_joining_nodes.unset(r);
ss::get_release_version.unset(r);
ss::get_scylla_release_version.unset(r);
ss::get_schema_version.unset(r);
ss::get_all_data_file_locations.unset(r);
ss::get_saved_caches_location.unset(r);
ss::get_range_to_endpoint_map.unset(r);
ss::get_pending_range_to_endpoint_map.unset(r);
ss::describe_any_ring.unset(r);
ss::describe_ring.unset(r);
ss::get_host_id_map.unset(r);
ss::get_load.unset(r);
ss::get_load_map.unset(r);
ss::get_current_generation_number.unset(r);
ss::get_natural_endpoints.unset(r);
ss::cdc_streams_check_and_repair.unset(r);
ss::force_keyspace_compaction.unset(r);
ss::force_keyspace_cleanup.unset(r);
ss::perform_keyspace_offstrategy_compaction.unset(r);
ss::upgrade_sstables.unset(r);
ss::force_keyspace_flush.unset(r);
ss::decommission.unset(r);
ss::move.unset(r);
ss::remove_node.unset(r);
ss::get_removal_status.unset(r);
ss::force_remove_completion.unset(r);
ss::set_logging_level.unset(r);
ss::get_logging_levels.unset(r);
ss::get_operation_mode.unset(r);
ss::is_starting.unset(r);
ss::get_drain_progress.unset(r);
ss::drain.unset(r);
ss::truncate.unset(r);
ss::get_keyspaces.unset(r);
ss::stop_gossiping.unset(r);
ss::start_gossiping.unset(r);
ss::is_gossip_running.unset(r);
ss::stop_daemon.unset(r);
ss::is_initialized.unset(r);
ss::join_ring.unset(r);
ss::is_joined.unset(r);
ss::set_stream_throughput_mb_per_sec.unset(r);
ss::get_stream_throughput_mb_per_sec.unset(r);
ss::get_compaction_throughput_mb_per_sec.unset(r);
ss::set_compaction_throughput_mb_per_sec.unset(r);
ss::is_incremental_backups_enabled.unset(r);
ss::set_incremental_backups_enabled.unset(r);
ss::rebuild.unset(r);
ss::bulk_load.unset(r);
ss::bulk_load_async.unset(r);
ss::reschedule_failed_deletions.unset(r);
ss::sample_key_range.unset(r);
ss::reset_local_schema.unset(r);
ss::set_trace_probability.unset(r);
ss::get_trace_probability.unset(r);
ss::get_slow_query_info.unset(r);
ss::set_slow_query.unset(r);
ss::enable_auto_compaction.unset(r);
ss::disable_auto_compaction.unset(r);
ss::enable_tombstone_gc.unset(r);
ss::disable_tombstone_gc.unset(r);
ss::deliver_hints.unset(r);
ss::get_cluster_name.unset(r);
ss::get_partitioner_name.unset(r);
ss::get_tombstone_warn_threshold.unset(r);
ss::set_tombstone_warn_threshold.unset(r);
ss::get_tombstone_failure_threshold.unset(r);
ss::set_tombstone_failure_threshold.unset(r);
ss::get_batch_size_failure_threshold.unset(r);
ss::set_batch_size_failure_threshold.unset(r);
ss::set_hinted_handoff_throttle_in_kb.unset(r);
ss::get_metrics_load.unset(r);
ss::get_exceptions.unset(r);
ss::get_total_hints_in_progress.unset(r);
ss::get_total_hints.unset(r);
ss::get_ownership.unset(r);
ss::get_effective_ownership.unset(r);
ss::sstable_info.unset(r);
}
enum class scrub_status {
successful = 0,
aborted,

View File

@@ -57,7 +57,8 @@ std::vector<sstring> parse_tables(const sstring& ks_name, http_context& ctx, con
// if the parameter is not found or is empty, returns a list of all table infos in the keyspace.
std::vector<table_info> parse_table_infos(const sstring& ks_name, http_context& ctx, const std::unordered_map<sstring, sstring>& query_params, sstring param_name);
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<db::system_keyspace>& sys_ls);
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss);
void unset_storage_service(http_context& ctx, httpd::routes& r);
void set_sstables_loader(http_context& ctx, httpd::routes& r, sharded<sstables_loader>& sst_loader);
void unset_sstables_loader(http_context& ctx, httpd::routes& r);
void set_view_builder(http_context& ctx, httpd::routes& r, sharded<db::view::view_builder>& vb);

View File

@@ -1357,6 +1357,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] {
ss.stop().get();
});
api::set_server_storage_service(ctx, ss).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});
supervisor::notify("initializing virtual tables");
smp::invoke_on_all([&] {
return db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, *cfg);
@@ -1590,7 +1596,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto stop_messaging_api = defer_verbose_shutdown("messaging service API", [&ctx] {
api::unset_server_messaging_service(ctx).get();
});
api::set_server_storage_service(ctx, ss, gossiper, sys_ks).get();
api::set_server_repair(ctx, repair).get();
auto stop_repair_api = defer_verbose_shutdown("repair API", [&ctx] {
api::unset_server_repair(ctx).get();

View File

@@ -228,6 +228,9 @@ private:
return _batchlog_manager;
}
friend struct ::node_ops_ctl;
public:
const gms::gossiper& gossiper() const noexcept {
return _gossiper;
};
@@ -236,9 +239,6 @@ private:
return _gossiper;
};
friend struct ::node_ops_ctl;
public:
locator::effective_replication_map_factory& get_erm_factory() noexcept {
return _erm_factory;
}