Merge 'Sanitize storage_proxy API handlers' from Pavel Emelyanov

Registering API handlers for services need to
- happen next to the corresponding service's start
- use only the provided service, not any other ones (if needed, the handler's service can use its internal dependencies to do its job)
- get the service to handle requests via argument, not from http context (http context, in turn, is going _not_ to depend on anything)

The storage proxy handlers don't follow any of that rules, this PR fixes them

Closes scylladb/scylladb#15584

* github.com:scylladb/scylladb:
  api: Make storage_proxy handlers use proxy argument
  api: Change some static helpers to use proxy instead of ctx
  api: Pass sharded<storage_proxy> reference to storage_proxy handlers
  api: Start (and stop) storage_proxy API earlier
  api: Remove storage_service argument from storage_proxy setup
  api: Move storage_proxy/ endpoint using storage_service
  api: Remove storage_proxy.hh from storage_service.cc
  main: Initialize API server early
This commit is contained in:
Botond Dénes
2023-10-02 10:44:00 +03:00
6 changed files with 128 additions and 127 deletions

View File

@@ -193,10 +193,10 @@ future<> unset_server_messaging_service(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_messaging_service(ctx, r); });
}
future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss) {
future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_proxy>& proxy) {
return register_api(ctx, "storage_proxy",
"The storage proxy API", [&ss] (http_context& ctx, routes& r) {
set_storage_proxy(ctx, r, ss);
"The storage proxy API", [&proxy] (http_context& ctx, routes& r) {
set_storage_proxy(ctx, r, proxy);
});
}

View File

@@ -107,7 +107,7 @@ future<> set_server_load_sstable(http_context& ctx, sharded<db::system_keyspace>
future<> unset_server_load_sstable(http_context& ctx);
future<> set_server_messaging_service(http_context& ctx, sharded<netw::messaging_service>& ms);
future<> unset_server_messaging_service(http_context& ctx);
future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss);
future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_proxy>& proxy);
future<> unset_server_storage_proxy(http_context& ctx);
future<> set_server_stream_manager(http_context& ctx, sharded<streaming::stream_manager>& sm);
future<> unset_server_stream_manager(http_context& ctx);

View File

@@ -10,7 +10,6 @@
#include "service/storage_proxy.hh"
#include "api/api-doc/storage_proxy.json.hh"
#include "api/api-doc/utils.json.hh"
#include "service/storage_service.hh"
#include "db/config.hh"
#include "utils/histogram.hh"
#include "replica/database.hh"
@@ -116,17 +115,17 @@ utils_json::estimated_histogram time_to_json_histogram(const utils::time_estimat
return res;
}
static future<json::json_return_type> sum_estimated_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(ctx.sp, [f] (service::storage_proxy_stats::stats& stats) {
static future<json::json_return_type> sum_estimated_histogram(sharded<service::storage_proxy>& proxy, utils::timed_rate_moving_average_summary_and_histogram service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(proxy, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).histogram();
}, utils::time_estimated_histogram_merge, utils::time_estimated_histogram()).then([](const utils::time_estimated_histogram& val) {
return make_ready_future<json::json_return_type>(time_to_json_histogram(val));
});
}
static future<json::json_return_type> sum_estimated_histogram(http_context& ctx, utils::estimated_histogram service::storage_proxy_stats::stats::*f) {
static future<json::json_return_type> sum_estimated_histogram(sharded<service::storage_proxy>& proxy, utils::estimated_histogram service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(ctx.sp, f, utils::estimated_histogram_merge,
return two_dimensional_map_reduce(proxy, f, utils::estimated_histogram_merge,
utils::estimated_histogram()).then([](const utils::estimated_histogram& val) {
utils_json::estimated_histogram res;
res = val;
@@ -134,8 +133,8 @@ static future<json::json_return_type> sum_estimated_histogram(http_context& ctx
});
}
static future<json::json_return_type> total_latency(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(ctx.sp, [f] (service::storage_proxy_stats::stats& stats) {
static future<json::json_return_type> total_latency(sharded<service::storage_proxy>& proxy, utils::timed_rate_moving_average_summary_and_histogram service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(proxy, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).hist.mean * (stats.*f).hist.count;
}, std::plus<double>(), 0.0).then([](double val) {
int64_t res = val;
@@ -184,43 +183,43 @@ sum_timer_stats_storage_proxy(distributed<proxy>& d,
});
}
void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_proxy>& proxy) {
sp::get_total_hints.set(r, [](std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(0);
});
sp::get_hinted_handoff_enabled.set(r, [&ctx](std::unique_ptr<http::request> req) {
const auto& filter = ctx.sp.local().get_hints_host_filter();
sp::get_hinted_handoff_enabled.set(r, [&proxy](std::unique_ptr<http::request> req) {
const auto& filter = proxy.local().get_hints_host_filter();
return make_ready_future<json::json_return_type>(!filter.is_disabled_for_all());
});
sp::set_hinted_handoff_enabled.set(r, [&ctx](std::unique_ptr<http::request> req) {
sp::set_hinted_handoff_enabled.set(r, [&proxy](std::unique_ptr<http::request> req) {
auto enable = req->get_query_param("enable");
auto filter = (enable == "true" || enable == "1")
? db::hints::host_filter(db::hints::host_filter::enabled_for_all_tag {})
: db::hints::host_filter(db::hints::host_filter::disabled_for_all_tag {});
return ctx.sp.invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return proxy.invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return sp.change_hints_host_filter(filter);
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
sp::get_hinted_handoff_enabled_by_dc.set(r, [&ctx](std::unique_ptr<http::request> req) {
sp::get_hinted_handoff_enabled_by_dc.set(r, [&proxy](std::unique_ptr<http::request> req) {
std::vector<sstring> res;
const auto& filter = ctx.sp.local().get_hints_host_filter();
const auto& filter = proxy.local().get_hints_host_filter();
const auto& dcs = filter.get_dcs();
res.reserve(res.size());
std::copy(dcs.begin(), dcs.end(), std::back_inserter(res));
return make_ready_future<json::json_return_type>(res);
});
sp::set_hinted_handoff_enabled_by_dc_list.set(r, [&ctx](std::unique_ptr<http::request> req) {
sp::set_hinted_handoff_enabled_by_dc_list.set(r, [&proxy](std::unique_ptr<http::request> req) {
auto dcs = req->get_query_param("dcs");
auto filter = db::hints::host_filter::parse_from_dc_list(std::move(dcs));
return ctx.sp.invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return proxy.invoke_on_all([filter = std::move(filter)] (service::storage_proxy& sp) {
return sp.change_hints_host_filter(filter);
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
@@ -342,144 +341,131 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
return make_ready_future<json::json_return_type>(json_void());
});
sp::get_read_repair_attempted.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_attempts);
sp::get_read_repair_attempted.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::read_repair_attempts);
});
sp::get_read_repair_repaired_blocking.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_blocking);
sp::get_read_repair_repaired_blocking.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::read_repair_repaired_blocking);
});
sp::get_read_repair_repaired_background.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_background);
sp::get_read_repair_repaired_background.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::read_repair_repaired_background);
});
sp::get_schema_versions.set(r, [&ss](std::unique_ptr<http::request> req) {
return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
sp::mapper_list entry;
entry.key = std::move(e.first);
entry.value = std::move(e.second);
res.emplace_back(std::move(entry));
}
return make_ready_future<json::json_return_type>(std::move(res));
});
sp::get_cas_read_timeouts.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &proxy::stats::cas_read_timeouts);
});
sp::get_cas_read_timeouts.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::cas_read_timeouts);
sp::get_cas_read_unavailables.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &proxy::stats::cas_read_unavailables);
});
sp::get_cas_read_unavailables.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::cas_read_unavailables);
sp::get_cas_write_timeouts.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &proxy::stats::cas_write_timeouts);
});
sp::get_cas_write_timeouts.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::cas_write_timeouts);
sp::get_cas_write_unavailables.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &proxy::stats::cas_write_unavailables);
});
sp::get_cas_write_unavailables.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::cas_write_unavailables);
sp::get_cas_write_metrics_unfinished_commit.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_stats(proxy, &proxy::stats::cas_write_unfinished_commit);
});
sp::get_cas_write_metrics_unfinished_commit.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_stats(ctx.sp, &proxy::stats::cas_write_unfinished_commit);
sp::get_cas_write_metrics_contention.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(proxy, &proxy::stats::cas_write_contention);
});
sp::get_cas_write_metrics_contention.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(ctx, &proxy::stats::cas_write_contention);
sp::get_cas_write_metrics_condition_not_met.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_stats(proxy, &proxy::stats::cas_write_condition_not_met);
});
sp::get_cas_write_metrics_condition_not_met.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_stats(ctx.sp, &proxy::stats::cas_write_condition_not_met);
sp::get_cas_write_metrics_failed_read_round_optimization.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_stats(proxy, &proxy::stats::cas_failed_read_round_optimization);
});
sp::get_cas_write_metrics_failed_read_round_optimization.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_stats(ctx.sp, &proxy::stats::cas_failed_read_round_optimization);
sp::get_cas_read_metrics_unfinished_commit.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_stats(proxy, &proxy::stats::cas_read_unfinished_commit);
});
sp::get_cas_read_metrics_unfinished_commit.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_stats(ctx.sp, &proxy::stats::cas_read_unfinished_commit);
sp::get_cas_read_metrics_contention.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(proxy, &proxy::stats::cas_read_contention);
});
sp::get_cas_read_metrics_contention.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(ctx, &proxy::stats::cas_read_contention);
sp::get_read_metrics_timeouts.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &service::storage_proxy_stats::stats::read_timeouts);
});
sp::get_read_metrics_timeouts.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::read_timeouts);
sp::get_read_metrics_unavailables.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &service::storage_proxy_stats::stats::read_unavailables);
});
sp::get_read_metrics_unavailables.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::read_unavailables);
sp::get_range_metrics_timeouts.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &service::storage_proxy_stats::stats::range_slice_timeouts);
});
sp::get_range_metrics_timeouts.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::range_slice_timeouts);
sp::get_range_metrics_unavailables.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &service::storage_proxy_stats::stats::range_slice_unavailables);
});
sp::get_range_metrics_unavailables.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::range_slice_unavailables);
sp::get_write_metrics_timeouts.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &service::storage_proxy_stats::stats::write_timeouts);
});
sp::get_write_metrics_timeouts.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::write_timeouts);
sp::get_write_metrics_unavailables.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(proxy, &service::storage_proxy_stats::stats::write_unavailables);
});
sp::get_write_metrics_unavailables.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::write_unavailables);
sp::get_read_metrics_timeouts_rates.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(proxy, &service::storage_proxy_stats::stats::read_timeouts);
});
sp::get_read_metrics_timeouts_rates.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::read_timeouts);
sp::get_read_metrics_unavailables_rates.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(proxy, &service::storage_proxy_stats::stats::read_unavailables);
});
sp::get_read_metrics_unavailables_rates.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::read_unavailables);
sp::get_range_metrics_timeouts_rates.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(proxy, &service::storage_proxy_stats::stats::range_slice_timeouts);
});
sp::get_range_metrics_timeouts_rates.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::range_slice_timeouts);
sp::get_range_metrics_unavailables_rates.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(proxy, &service::storage_proxy_stats::stats::range_slice_unavailables);
});
sp::get_range_metrics_unavailables_rates.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::range_slice_unavailables);
sp::get_write_metrics_timeouts_rates.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(proxy, &service::storage_proxy_stats::stats::write_timeouts);
});
sp::get_write_metrics_timeouts_rates.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::write_timeouts);
sp::get_write_metrics_unavailables_rates.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(proxy, &service::storage_proxy_stats::stats::write_unavailables);
});
sp::get_write_metrics_unavailables_rates.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::write_unavailables);
sp::get_range_metrics_latency_histogram_depricated.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_histogram_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::range);
});
sp::get_range_metrics_latency_histogram_depricated.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_histogram_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::range);
sp::get_write_metrics_latency_histogram_depricated.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_histogram_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::write);
});
sp::get_write_metrics_latency_histogram_depricated.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_histogram_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::write);
sp::get_read_metrics_latency_histogram_depricated.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_histogram_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::read);
});
sp::get_read_metrics_latency_histogram_depricated.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_histogram_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read);
sp::get_range_metrics_latency_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::range);
});
sp::get_range_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::range);
sp::get_write_metrics_latency_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::write);
});
sp::get_cas_write_metrics_latency_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timer_stats(proxy, &proxy::stats::cas_write);
});
sp::get_write_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::write);
});
sp::get_cas_write_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timer_stats(ctx.sp, &proxy::stats::cas_write);
});
sp::get_cas_read_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timer_stats(ctx.sp, &proxy::stats::cas_read);
sp::get_cas_read_metrics_latency_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timer_stats(proxy, &proxy::stats::cas_read);
});
sp::get_view_write_metrics_latency_histogram.set(r, [](std::unique_ptr<http::request> req) {
@@ -490,31 +476,31 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
return make_ready_future<json::json_return_type>(get_empty_moving_average());
});
sp::get_read_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read);
sp::get_read_metrics_latency_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::read);
});
sp::get_read_estimated_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(ctx, &service::storage_proxy_stats::stats::read);
sp::get_read_estimated_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(proxy, &service::storage_proxy_stats::stats::read);
});
sp::get_read_latency.set(r, [&ctx](std::unique_ptr<http::request> req) {
return total_latency(ctx, &service::storage_proxy_stats::stats::read);
sp::get_read_latency.set(r, [&proxy](std::unique_ptr<http::request> req) {
return total_latency(proxy, &service::storage_proxy_stats::stats::read);
});
sp::get_write_estimated_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(ctx, &service::storage_proxy_stats::stats::write);
sp::get_write_estimated_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_estimated_histogram(proxy, &service::storage_proxy_stats::stats::write);
});
sp::get_write_latency.set(r, [&ctx](std::unique_ptr<http::request> req) {
return total_latency(ctx, &service::storage_proxy_stats::stats::write);
sp::get_write_latency.set(r, [&proxy](std::unique_ptr<http::request> req) {
return total_latency(proxy, &service::storage_proxy_stats::stats::write);
});
sp::get_range_estimated_histogram.set(r, [&ctx](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::range);
sp::get_range_estimated_histogram.set(r, [&proxy](std::unique_ptr<http::request> req) {
return sum_timer_stats_storage_proxy(proxy, &service::storage_proxy_stats::stats::range);
});
sp::get_range_latency.set(r, [&ctx](std::unique_ptr<http::request> req) {
return total_latency(ctx, &service::storage_proxy_stats::stats::range);
sp::get_range_latency.set(r, [&proxy](std::unique_ptr<http::request> req) {
return total_latency(proxy, &service::storage_proxy_stats::stats::range);
});
}
@@ -547,7 +533,6 @@ void unset_storage_proxy(http_context& ctx, routes& r) {
sp::get_read_repair_attempted.unset(r);
sp::get_read_repair_repaired_blocking.unset(r);
sp::get_read_repair_repaired_background.unset(r);
sp::get_schema_versions.unset(r);
sp::get_cas_read_timeouts.unset(r);
sp::get_cas_read_unavailables.unset(r);
sp::get_cas_write_timeouts.unset(r);

View File

@@ -11,11 +11,11 @@
#include <seastar/core/sharded.hh>
#include "api.hh"
namespace service { class storage_service; }
namespace service { class storage_proxy; }
namespace api {
void set_storage_proxy(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss);
void set_storage_proxy(http_context& ctx, httpd::routes& r, sharded<service::storage_proxy>& proxy);
void unset_storage_proxy(http_context& ctx, httpd::routes& r);
}

View File

@@ -8,6 +8,7 @@
#include "storage_service.hh"
#include "api/api-doc/storage_service.json.hh"
#include "api/api-doc/storage_proxy.json.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "utils/hash.hh"
@@ -42,7 +43,6 @@
#include "thrift/controller.hh"
#include "locator/token_metadata.hh"
#include "cdc/generation_service.hh"
#include "service/storage_proxy.hh"
#include "locator/abstract_replication_strategy.hh"
#include "sstables_loader.hh"
#include "db/view/view_builder.hh"
@@ -55,6 +55,7 @@ extern logging::logger apilog;
namespace api {
namespace ss = httpd::storage_service_json;
namespace sp = httpd::storage_proxy_json;
using namespace json;
sstring validate_keyspace(http_context& ctx, sstring ks_name) {
@@ -1346,6 +1347,19 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
co_return json_void();
});
sp::get_schema_versions.set(r, [&ss](std::unique_ptr<http::request> req) {
return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
sp::mapper_list entry;
entry.key = std::move(e.first);
entry.value = std::move(e.second);
res.emplace_back(std::move(entry));
}
return make_ready_future<json::json_return_type>(std::move(res));
});
});
}
void unset_storage_service(http_context& ctx, routes& r) {
@@ -1435,6 +1449,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::get_ownership.unset(r);
ss::get_effective_ownership.unset(r);
ss::sstable_info.unset(r);
sp::get_schema_versions.unset(r);
}
enum class scrub_status {

23
main.cc
View File

@@ -829,6 +829,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}
const auto hinted_handoff_enabled = cfg->hinted_handoff_enabled();
auto api_addr = utils::resolve(cfg->api_address || cfg->rpc_address, family, preferred).get0();
supervisor::notify("starting API server");
ctx.http_server.start("API").get();
auto stop_http_server = defer_verbose_shutdown("API server", [&ctx] {
ctx.http_server.stop().get();
});
api::set_server_init(ctx).get();
supervisor::notify("starting prometheus API server");
std::any stop_prometheus;
if (cfg->prometheus_port()) {
@@ -1054,6 +1062,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
api::set_server_storage_proxy(ctx, proxy).get();
auto stop_sp_api = defer_verbose_shutdown("storage proxy API", [&ctx] {
api::unset_server_storage_proxy(ctx).get();
});
static sharded<cql3::cql_config> cql_config;
cql_config.start(std::ref(*cfg)).get();
@@ -1084,13 +1096,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
tracing::tracing::tracing_instance().stop().get();
});
auto api_addr = utils::resolve(cfg->api_address || cfg->rpc_address, family, preferred).get0();
supervisor::notify("starting API server");
ctx.http_server.start("API").get();
auto stop_http_server = defer_verbose_shutdown("API server", [&ctx] {
ctx.http_server.stop().get();
});
api::set_server_init(ctx).get();
with_scheduling_group(maintenance_scheduling_group, [&] {
return ctx.http_server.listen(socket_address{api_addr, cfg->api_port()});
}).get();
@@ -1492,10 +1497,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto stop_snitch_api = defer_verbose_shutdown("snitch API", [&ctx] {
api::unset_server_snitch(ctx).get();
});
api::set_server_storage_proxy(ctx, ss).get();
auto stop_sp_api = defer_verbose_shutdown("storage proxy API", [&ctx] {
api::unset_server_storage_proxy(ctx).get();
});
api::set_server_load_sstable(ctx, sys_ks).get();
auto stop_cf_api = defer_verbose_shutdown("column family API", [&ctx] {
api::unset_server_load_sstable(ctx).get();