Merge 'Remove global proxy usage from API handlers' from Pavel Emelyanov

There are few places in the API handlers that call global proxy for their needs. Most of those places are easy to patch, because proxy is either at http_ctx thing right inside the handler code. Also there's a handler code in view_builder that needs proxy too, but it really needs topology, not proxy, and can get it elsewhere (the handler is coroutinized while at it)

Closes #13593

* github.com:scylladb/scylladb:
  view: Get topology via database tokens
  view: Indentation fix after previous patch
  view: Coroutinuze view_builder::view_build_statuses()
  api: Use ctx.sp in storage service handler
  api,main: Unset storage_proxy API on stop
  api: Use ctx.sp in set_storage_proxy() routes
This commit is contained in:
Avi Kivity
2023-04-20 16:31:31 +03:00
7 changed files with 97 additions and 21 deletions

View File

@@ -194,6 +194,10 @@ future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_se
});
}
future<> unset_server_storage_proxy(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_proxy(ctx, r); });
}
future<> set_server_stream_manager(http_context& ctx, sharded<streaming::stream_manager>& sm) {
return register_api(ctx, "stream_manager",
"The stream manager API", [&sm] (http_context& ctx, routes& r) {

View File

@@ -107,6 +107,7 @@ future<> unset_server_load_sstable(http_context& ctx);
future<> set_server_messaging_service(http_context& ctx, sharded<netw::messaging_service>& ms);
future<> unset_server_messaging_service(http_context& ctx);
future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss);
future<> unset_server_storage_proxy(http_context& ctx);
future<> set_server_stream_manager(http_context& ctx, sharded<streaming::stream_manager>& sm);
future<> unset_server_stream_manager(http_context& ctx);
future<> set_hinted_handoff(http_context& ctx, sharded<gms::gossiper>& g);

View File

@@ -191,36 +191,36 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
return make_ready_future<json::json_return_type>(0);
});
sp::get_hinted_handoff_enabled.set(r, [](std::unique_ptr<http::request> req) {
const auto& filter = service::get_storage_proxy().local().get_hints_host_filter();
sp::get_hinted_handoff_enabled.set(r, [&ctx](std::unique_ptr<http::request> req) {
const auto& filter = ctx.sp.local().get_hints_host_filter();
return make_ready_future<json::json_return_type>(!filter.is_disabled_for_all());
});
sp::set_hinted_handoff_enabled.set(r, [](std::unique_ptr<http::request> req) {
sp::set_hinted_handoff_enabled.set(r, [&ctx](std::unique_ptr<http::request> req) {
auto enable = req->get_query_param("enable");
auto filter = (enable == "true" || enable == "1")
? db::hints::host_filter(db::hints::host_filter::enabled_for_all_tag {})
: db::hints::host_filter(db::hints::host_filter::disabled_for_all_tag {});
return service::get_storage_proxy().invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return ctx.sp.invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return sp.change_hints_host_filter(filter);
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
sp::get_hinted_handoff_enabled_by_dc.set(r, [](std::unique_ptr<http::request> req) {
sp::get_hinted_handoff_enabled_by_dc.set(r, [&ctx](std::unique_ptr<http::request> req) {
std::vector<sstring> res;
const auto& filter = service::get_storage_proxy().local().get_hints_host_filter();
const auto& filter = ctx.sp.local().get_hints_host_filter();
const auto& dcs = filter.get_dcs();
res.reserve(res.size());
std::copy(dcs.begin(), dcs.end(), std::back_inserter(res));
return make_ready_future<json::json_return_type>(res);
});
sp::set_hinted_handoff_enabled_by_dc_list.set(r, [](std::unique_ptr<http::request> req) {
sp::set_hinted_handoff_enabled_by_dc_list.set(r, [&ctx](std::unique_ptr<http::request> req) {
auto dcs = req->get_query_param("dcs");
auto filter = db::hints::host_filter::parse_from_dc_list(std::move(dcs));
return service::get_storage_proxy().invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return ctx.sp.invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return sp.change_hints_host_filter(filter);
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
@@ -518,4 +518,73 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
});
}
void unset_storage_proxy(http_context& ctx, routes& r) {
sp::get_total_hints.unset(r);
sp::get_hinted_handoff_enabled.unset(r);
sp::set_hinted_handoff_enabled.unset(r);
sp::get_hinted_handoff_enabled_by_dc.unset(r);
sp::set_hinted_handoff_enabled_by_dc_list.unset(r);
sp::get_max_hint_window.unset(r);
sp::set_max_hint_window.unset(r);
sp::get_max_hints_in_progress.unset(r);
sp::set_max_hints_in_progress.unset(r);
sp::get_hints_in_progress.unset(r);
sp::get_rpc_timeout.unset(r);
sp::set_rpc_timeout.unset(r);
sp::get_read_rpc_timeout.unset(r);
sp::set_read_rpc_timeout.unset(r);
sp::get_write_rpc_timeout.unset(r);
sp::set_write_rpc_timeout.unset(r);
sp::get_counter_write_rpc_timeout.unset(r);
sp::set_counter_write_rpc_timeout.unset(r);
sp::get_cas_contention_timeout.unset(r);
sp::set_cas_contention_timeout.unset(r);
sp::get_range_rpc_timeout.unset(r);
sp::set_range_rpc_timeout.unset(r);
sp::get_truncate_rpc_timeout.unset(r);
sp::set_truncate_rpc_timeout.unset(r);
sp::reload_trigger_classes.unset(r);
sp::get_read_repair_attempted.unset(r);
sp::get_read_repair_repaired_blocking.unset(r);
sp::get_read_repair_repaired_background.unset(r);
sp::get_schema_versions.unset(r);
sp::get_cas_read_timeouts.unset(r);
sp::get_cas_read_unavailables.unset(r);
sp::get_cas_write_timeouts.unset(r);
sp::get_cas_write_unavailables.unset(r);
sp::get_cas_write_metrics_unfinished_commit.unset(r);
sp::get_cas_write_metrics_contention.unset(r);
sp::get_cas_write_metrics_condition_not_met.unset(r);
sp::get_cas_write_metrics_failed_read_round_optimization.unset(r);
sp::get_cas_read_metrics_unfinished_commit.unset(r);
sp::get_cas_read_metrics_contention.unset(r);
sp::get_read_metrics_timeouts.unset(r);
sp::get_read_metrics_unavailables.unset(r);
sp::get_range_metrics_timeouts.unset(r);
sp::get_range_metrics_unavailables.unset(r);
sp::get_write_metrics_timeouts.unset(r);
sp::get_write_metrics_unavailables.unset(r);
sp::get_read_metrics_timeouts_rates.unset(r);
sp::get_read_metrics_unavailables_rates.unset(r);
sp::get_range_metrics_timeouts_rates.unset(r);
sp::get_range_metrics_unavailables_rates.unset(r);
sp::get_write_metrics_timeouts_rates.unset(r);
sp::get_write_metrics_unavailables_rates.unset(r);
sp::get_range_metrics_latency_histogram_depricated.unset(r);
sp::get_write_metrics_latency_histogram_depricated.unset(r);
sp::get_read_metrics_latency_histogram_depricated.unset(r);
sp::get_range_metrics_latency_histogram.unset(r);
sp::get_write_metrics_latency_histogram.unset(r);
sp::get_cas_write_metrics_latency_histogram.unset(r);
sp::get_cas_read_metrics_latency_histogram.unset(r);
sp::get_view_write_metrics_latency_histogram.unset(r);
sp::get_read_metrics_latency_histogram.unset(r);
sp::get_read_estimated_histogram.unset(r);
sp::get_read_latency.unset(r);
sp::get_write_estimated_histogram.unset(r);
sp::get_write_latency.unset(r);
sp::get_range_estimated_histogram.unset(r);
sp::get_range_latency.unset(r);
}
}

View File

@@ -16,5 +16,6 @@ namespace service { class storage_service; }
namespace api {
void set_storage_proxy(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss);
void unset_storage_proxy(http_context& ctx, httpd::routes& r);
}

View File

@@ -1023,12 +1023,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});
ss::reset_local_schema.set(r, [&sys_ks](std::unique_ptr<http::request> req) {
ss::reset_local_schema.set(r, [&ctx, &sys_ks](std::unique_ptr<http::request> req) {
// FIXME: We should truncate schema tables if more than one node in the cluster.
auto& sp = service::get_storage_proxy();
auto& fs = sp.local().features();
auto& fs = ctx.sp.local().features();
apilog.info("reset_local_schema");
return db::schema_tables::recalculate_schema_version(sys_ks, sp, fs).then([] {
return db::schema_tables::recalculate_schema_version(sys_ks, ctx.sp, fs).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

View File

@@ -2094,16 +2094,15 @@ future<> view_builder::calculate_shard_build_step(view_builder_init_state& vbi)
future<std::unordered_map<sstring, sstring>>
view_builder::view_build_statuses(sstring keyspace, sstring view_name) const {
return _sys_dist_ks.view_status(std::move(keyspace), std::move(view_name)).then([] (std::unordered_map<locator::host_id, sstring> status) {
std::unordered_map<sstring, sstring> status_map;
const auto& topo = service::get_local_storage_proxy().get_token_metadata_ptr()->get_topology();
topo.for_each_node([&] (const locator::node *node) {
auto it = status.find(node->host_id());
auto s = it != status.end() ? std::move(it->second) : "UNKNOWN";
status_map.emplace(node->endpoint().to_sstring(), std::move(s));
});
return status_map;
std::unordered_map<locator::host_id, sstring> status = co_await _sys_dist_ks.view_status(std::move(keyspace), std::move(view_name));
std::unordered_map<sstring, sstring> status_map;
const auto& topo = _db.get_token_metadata().get_topology();
topo.for_each_node([&] (const locator::node *node) {
auto it = status.find(node->host_id());
auto s = it != status.end() ? std::move(it->second) : "UNKNOWN";
status_map.emplace(node->endpoint().to_sstring(), std::move(s));
});
co_return status_map;
}
future<> view_builder::add_new_view(view_ptr view, build_step& step) {

View File

@@ -1318,6 +1318,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
api::unset_server_snitch(ctx).get();
});
api::set_server_storage_proxy(ctx, ss).get();
auto stop_sp_api = defer_verbose_shutdown("storage proxy API", [&ctx] {
api::unset_server_storage_proxy(ctx).get();
});
api::set_server_load_sstable(ctx, sys_ks).get();
auto stop_cf_api = defer_verbose_shutdown("column family API", [&ctx] {
api::unset_server_load_sstable(ctx).get();