Merge 'Add per scheduling groups statistics' from Eliran

This set implements support for per scheduling group statistics in
storage proxy and tables view statistics (although tables view per
scheduling group stats are not actively applied in this series).
Having those statistics per scheduling group can help in finding operations
that are performed outside their context, another advantage is that
it lays the land for supporting per service level statistics for the
workload prioritization enterprise feature.
At some point there was a thought to add those stats per role but
for now it is not feasible at the moment:
1. The number of roles/user is unbounded so it is dangerous to
hold stats (in memory) for all of them.
2. We will need a proper design of how to deal with the hierarchical
nature of roles in the stats.

Besides these reasons and regardless, it is beneficial to look on
resource related stats per scheduling group, looking at resources
per user or role will not necessarily give insights since resources
are divided per sg and not role, so it can lead to false conclusions
if more than one role is attached to the same service level.

Tests:
unit tests (Dev, Debug)
validating the stats with monitor

* es/per_sg_stats/v6:
  storage proxy: migrate to per scheduling group statistics
  internalize storage proxy statistics metric registration
This commit is contained in:
Piotr Sarna
2020-01-30 15:02:33 +01:00
9 changed files with 565 additions and 304 deletions

View File

@@ -27,6 +27,7 @@
#include "db/config.hh"
#include "utils/histogram.hh"
#include "database.hh"
#include "seastar/core/scheduling_specific.hh"
namespace api {
@@ -34,12 +35,70 @@ namespace sp = httpd::storage_proxy_json;
using proxy = service::storage_proxy;
using namespace json;
static future<utils::rate_moving_average> sum_timed_rate(distributed<proxy>& d, utils::timed_rate_moving_average proxy::stats::*f) {
return d.map_reduce0([f](const proxy& p) {return (p.get_stats().*f).rate();}, utils::rate_moving_average(),
std::plus<utils::rate_moving_average>());
/**
* This function implement a two dimentional map reduce where
* the first level is a distributed storage_proxy class and the
* second level is the stats per scheduling group class.
* @param d - a reference to the storage_proxy distributed class.
* @param mapper - the internal mapper that is used to map the internal
* stat class into a value of type `V`.
* @param reducer - the reducer that is used in both outer and inner
* aggregations.
* @param initial_value - the initial value to use for both aggregations
* @return A future that resolves to the result of the aggregation.
*/
template<typename V, typename Reducer, typename InnerMapper>
future<V> two_dimensional_map_reduce(distributed<service::storage_proxy>& d,
InnerMapper mapper, Reducer reducer, V initial_value) {
return d.map_reduce0( [mapper, reducer, initial_value] (const service::storage_proxy& sp) {
return map_reduce_scheduling_group_specific<service::storage_proxy_stats::stats>(
mapper, reducer, initial_value, sp.get_stats_key());
}, initial_value, reducer);
}
static future<json::json_return_type> sum_timed_rate_as_obj(distributed<proxy>& d, utils::timed_rate_moving_average proxy::stats::*f) {
/**
* This function implement a two dimentional map reduce where
* the first level is a distributed storage_proxy class and the
* second level is the stats per scheduling group class.
* @param d - a reference to the storage_proxy distributed class.
* @param f - a field pointer which is the implicit internal reducer.
* @param reducer - the reducer that is used in both outer and inner
* aggregations.
* @param initial_value - the initial value to use for both aggregations* @return
* @return A future that resolves to the result of the aggregation.
*/
template<typename V, typename Reducer, typename F>
future<V> two_dimensional_map_reduce(distributed<service::storage_proxy>& d,
V F::*f, Reducer reducer, V initial_value) {
return two_dimensional_map_reduce(d, [f] (F& stats) {
return stats.*f;
}, reducer, initial_value);
}
/**
* A partial Specialization of sum_stats for the storage proxy
* case where the get stats function doesn't return a
* stats object with fields but a per scheduling group
* stats object, the name was also changed since functions
* partial specialization is not supported in C++.
*
*/
template<typename V, typename F>
future<json::json_return_type> sum_stats_storage_proxy(distributed<proxy>& d, V F::*f) {
return two_dimensional_map_reduce(d, [f] (F& stats) { return stats.*f; }, std::plus<V>(), V(0)).then([] (V val) {
return make_ready_future<json::json_return_type>(val);
});
}
static future<utils::rate_moving_average> sum_timed_rate(distributed<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(d, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).rate();
}, std::plus<utils::rate_moving_average>(), utils::rate_moving_average());
}
static future<json::json_return_type> sum_timed_rate_as_obj(distributed<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
return sum_timed_rate(d, f).then([](const utils::rate_moving_average& val) {
httpd::utils_json::rate_moving_average m;
m = val;
@@ -51,29 +110,72 @@ httpd::utils_json::rate_moving_average_and_histogram get_empty_moving_average()
return timer_to_json(utils::rate_moving_average_and_histogram());
}
static future<json::json_return_type> sum_timed_rate_as_long(distributed<proxy>& d, utils::timed_rate_moving_average proxy::stats::*f) {
static future<json::json_return_type> sum_timed_rate_as_long(distributed<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
return sum_timed_rate(d, f).then([](const utils::rate_moving_average& val) {
return make_ready_future<json::json_return_type>(val.count);
});
}
static future<json::json_return_type> sum_estimated_histogram(http_context& ctx, utils::estimated_histogram proxy::stats::*f) {
return ctx.sp.map_reduce0([f](const proxy& p) {return p.get_stats().*f;}, utils::estimated_histogram(),
utils::estimated_histogram_merge).then([](const utils::estimated_histogram& val) {
static future<json::json_return_type> sum_estimated_histogram(http_context& ctx, utils::estimated_histogram service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(ctx.sp, f, utils::estimated_histogram_merge,
utils::estimated_histogram()).then([](const utils::estimated_histogram& val) {
utils_json::estimated_histogram res;
res = val;
return make_ready_future<json::json_return_type>(res);
});
}
static future<json::json_return_type> total_latency(http_context& ctx, utils::timed_rate_moving_average_and_histogram proxy::stats::*f) {
return ctx.sp.map_reduce0([f](const proxy& p) {return (p.get_stats().*f).hist.mean * (p.get_stats().*f).hist.count;}, 0.0,
std::plus<double>()).then([](double val) {
static future<json::json_return_type> total_latency(http_context& ctx, utils::timed_rate_moving_average_and_histogram service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(ctx.sp, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).hist.mean * (stats.*f).hist.count;
}, std::plus<double>(), 0.0).then([](double val) {
int64_t res = val;
return make_ready_future<json::json_return_type>(res);
});
}
/**
* A partial Specialization of sum_histogram_stats
* for the storage proxy case where the get stats
* function doesn't return a stats object with
* fields but a per scheduling group stats object,
* the name was also changed since function partial
* specialization is not supported in C++.
*/
template<typename F>
future<json::json_return_type>
sum_histogram_stats_storage_proxy(distributed<proxy>& d,
utils::timed_rate_moving_average_and_histogram F::*f) {
return two_dimensional_map_reduce(d, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).hist;
}, std::plus<utils::ihistogram>(), utils::ihistogram()).
then([](const utils::ihistogram& val) {
return make_ready_future<json::json_return_type>(to_json(val));
});
}
/**
* A partial Specialization of sum_timer_stats for the
* storage proxy case where the get stats function
* doesn't return a stats object with fields but a
* per scheduling group stats object, the name
* was also changed since partial function specialization
* is not supported in C++.
*/
template<typename F>
future<json::json_return_type>
sum_timer_stats_storage_proxy(distributed<proxy>& d,
utils::timed_rate_moving_average_and_histogram F::*f) {
return two_dimensional_map_reduce(d, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).rate();
}, std::plus<utils::rate_moving_average_and_histogram>(),
utils::rate_moving_average_and_histogram()).then([](const utils::rate_moving_average_and_histogram& val) {
return make_ready_future<json::json_return_type>(timer_to_json(val));
});
}
void set_storage_proxy(http_context& ctx, routes& r) {
sp::get_total_hints.set(r, [](std::unique_ptr<request> req) {
//TBD
@@ -223,15 +325,15 @@ void set_storage_proxy(http_context& ctx, routes& r) {
});
sp::get_read_repair_attempted.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_stats(ctx.sp, &proxy::stats::read_repair_attempts);
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_attempts);
});
sp::get_read_repair_repaired_blocking.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_stats(ctx.sp, &proxy::stats::read_repair_repaired_blocking);
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_blocking);
});
sp::get_read_repair_repaired_background.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_stats(ctx.sp, &proxy::stats::read_repair_repaired_background);
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_background);
});
sp::get_schema_versions.set(r, [](std::unique_ptr<request> req) {
@@ -284,71 +386,71 @@ void set_storage_proxy(http_context& ctx, routes& r) {
});
sp::get_read_metrics_timeouts.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::read_timeouts);
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::read_timeouts);
});
sp::get_read_metrics_unavailables.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::read_unavailables);
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::read_unavailables);
});
sp::get_range_metrics_timeouts.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::range_slice_timeouts);
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::range_slice_timeouts);
});
sp::get_range_metrics_unavailables.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::range_slice_unavailables);
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::range_slice_unavailables);
});
sp::get_write_metrics_timeouts.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::write_timeouts);
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::write_timeouts);
});
sp::get_write_metrics_unavailables.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_long(ctx.sp, &proxy::stats::write_unavailables);
return sum_timed_rate_as_long(ctx.sp, &service::storage_proxy_stats::stats::write_unavailables);
});
sp::get_read_metrics_timeouts_rates.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_obj(ctx.sp, &proxy::stats::read_timeouts);
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::read_timeouts);
});
sp::get_read_metrics_unavailables_rates.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_obj(ctx.sp, &proxy::stats::read_unavailables);
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::read_unavailables);
});
sp::get_range_metrics_timeouts_rates.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_obj(ctx.sp, &proxy::stats::range_slice_timeouts);
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::range_slice_timeouts);
});
sp::get_range_metrics_unavailables_rates.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_obj(ctx.sp, &proxy::stats::range_slice_unavailables);
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::range_slice_unavailables);
});
sp::get_write_metrics_timeouts_rates.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_obj(ctx.sp, &proxy::stats::write_timeouts);
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::write_timeouts);
});
sp::get_write_metrics_unavailables_rates.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timed_rate_as_obj(ctx.sp, &proxy::stats::write_unavailables);
return sum_timed_rate_as_obj(ctx.sp, &service::storage_proxy_stats::stats::write_unavailables);
});
sp::get_range_metrics_latency_histogram_depricated.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_histogram_stats(ctx.sp, &proxy::stats::range);
return sum_histogram_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::range);
});
sp::get_write_metrics_latency_histogram_depricated.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_histogram_stats(ctx.sp, &proxy::stats::write);
return sum_histogram_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::write);
});
sp::get_read_metrics_latency_histogram_depricated.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_histogram_stats(ctx.sp, &proxy::stats::read);
return sum_histogram_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read);
});
sp::get_range_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timer_stats(ctx.sp, &proxy::stats::range);
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::range);
});
sp::get_write_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timer_stats(ctx.sp, &proxy::stats::write);
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::write);
});
sp::get_cas_write_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timer_stats(ctx.sp, &proxy::stats::cas_write);
@@ -367,30 +469,30 @@ void set_storage_proxy(http_context& ctx, routes& r) {
});
sp::get_read_metrics_latency_histogram.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timer_stats(ctx.sp, &proxy::stats::read);
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read);
});
sp::get_read_estimated_histogram.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_estimated_histogram(ctx, &proxy::stats::estimated_read);
return sum_estimated_histogram(ctx, &service::storage_proxy_stats::stats::estimated_read);
});
sp::get_read_latency.set(r, [&ctx](std::unique_ptr<request> req) {
return total_latency(ctx, &proxy::stats::read);
return total_latency(ctx, &service::storage_proxy_stats::stats::read);
});
sp::get_write_estimated_histogram.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_estimated_histogram(ctx, &proxy::stats::estimated_write);
return sum_estimated_histogram(ctx, &service::storage_proxy_stats::stats::estimated_write);
});
sp::get_write_latency.set(r, [&ctx](std::unique_ptr<request> req) {
return total_latency(ctx, &proxy::stats::write);
return total_latency(ctx, &service::storage_proxy_stats::stats::write);
});
sp::get_range_estimated_histogram.set(r, [&ctx](std::unique_ptr<request> req) {
return sum_timer_stats(ctx.sp, &proxy::stats::range);
return sum_timer_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::range);
});
sp::get_range_latency.set(r, [&ctx](std::unique_ptr<request> req) {
return total_latency(ctx, &proxy::stats::range);
return total_latency(ctx, &service::storage_proxy_stats::stats::range);
});
}

View File

@@ -145,6 +145,28 @@ void view_info::initialize_base_dependent_fields(const schema& base) {
namespace db {
namespace view {
stats::stats(const sstring& category, label_instance ks_label, label_instance cf_label) :
service::storage_proxy_stats::write_stats(category, false),
_ks_label(ks_label),
_cf_label(cf_label) {
}
void stats::register_stats() {
namespace ms = seastar::metrics;
namespace sp_stats = service::storage_proxy_stats;
_metrics.add_group("column_family", {
ms::make_total_operations("view_updates_pushed_remote", view_updates_pushed_remote, ms::description("Number of updates (mutations) pushed to remote view replicas"),
{_cf_label, _ks_label}),
ms::make_total_operations("view_updates_failed_remote", view_updates_failed_remote, ms::description("Number of updates (mutations) that failed to be pushed to remote view replicas"),
{_cf_label, _ks_label}),
ms::make_total_operations("view_updates_pushed_local", view_updates_pushed_local, ms::description("Number of updates (mutations) pushed to local view replicas"),
{_cf_label, _ks_label}),
ms::make_total_operations("view_updates_failed_local", view_updates_failed_local, ms::description("Number of updates (mutations) that failed to be pushed to local view replicas"),
{_cf_label, _ks_label}),
ms::make_gauge("view_updates_pending", ms::description("Number of updates pushed to view and are still to be completed"),
{_cf_label, _ks_label}, writes),
});
}
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key) {
return view.select_statement().get_restrictions()->get_partition_key_restrictions()->is_satisfied_by(

View File

@@ -48,8 +48,13 @@ struct stats : public service::storage_proxy_stats::write_stats {
int64_t view_updates_pushed_remote = 0;
int64_t view_updates_failed_local = 0;
int64_t view_updates_failed_remote = 0;
using label_instance = seastar::metrics::label_instance;
stats(const sstring& category, label_instance ks_label, label_instance cf_label);
void register_stats();
private:
label_instance _ks_label;
label_instance _cf_label;
stats(const sstring& category) : service::storage_proxy_stats::write_stats(category, false) { }
};
/**

10
main.cc
View File

@@ -794,7 +794,15 @@ int main(int ac, char** av) {
spcfg.write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
spcfg.write_ack_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0();
static db::view::node_update_backlog node_backlog(smp::count, 10ms);
proxy.start(std::ref(db), spcfg, std::ref(node_backlog)).get();
scheduling_group_key_config storage_proxy_stats_cfg =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
storage_proxy_stats_cfg.constructor = [plain_constructor = storage_proxy_stats_cfg.constructor] (void* ptr) {
plain_constructor(ptr);
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_stats();
reinterpret_cast<service::storage_proxy_stats::stats*>(ptr)->register_split_metrics_local();
};
proxy.start(std::ref(db), spcfg, std::ref(node_backlog),
scheduling_group_key_create(storage_proxy_stats_cfg).get0()).get();
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting migration manager");

File diff suppressed because it is too large Load Diff

View File

@@ -47,6 +47,7 @@
#include "query-result-set.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/execution_stage.hh>
#include <seastar/core/scheduling_specific.hh>
#include "db/consistency_level_type.hh"
#include "db/read_repair_decision.hh"
#include "db/write_type.hh"
@@ -181,6 +182,7 @@ public:
using write_stats = storage_proxy_stats::write_stats;
using stats = storage_proxy_stats::stats;
using global_stats = storage_proxy_stats::global_stats;
class coordinator_query_options {
clock_type::time_point _timeout;
@@ -249,7 +251,8 @@ private:
db::hints::resource_manager _hints_resource_manager;
std::optional<db::hints::manager> _hints_manager;
db::hints::manager _hints_for_views_manager;
stats _stats;
scheduling_group_key _stats_key;
storage_proxy_stats::global_stats _global_stats;
static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
// for read repair chance calculation
std::default_random_engine _urandom;
@@ -397,7 +400,8 @@ private:
template<typename Range>
future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout);
public:
storage_proxy(distributed<database>& db, config cfg, db::view::node_update_backlog& max_view_update_backlog);
storage_proxy(distributed<database>& db, config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key);
~storage_proxy();
const distributed<database>& get_db() const {
return _db;
@@ -541,7 +545,20 @@ public:
future<> drain_on_shutdown();
const stats& get_stats() const {
return _stats;
return scheduling_group_get_specific<storage_proxy_stats::stats>(_stats_key);
}
stats& get_stats() {
return scheduling_group_get_specific<storage_proxy_stats::stats>(_stats_key);
}
const global_stats& get_global_stats() const {
return _global_stats;
}
global_stats& get_global_stats() {
return _global_stats;
}
scheduling_group_key get_stats_key() const {
return _stats_key;
}
static unsigned cas_shard(dht::token token);

View File

@@ -33,7 +33,7 @@ namespace storage_proxy_stats {
// split statistics counters
struct split_stats {
static seastar::metrics::label datacenter_label;
static seastar::metrics::label op_type_label;
private:
struct stats_counter {
uint64_t val = 0;
@@ -108,8 +108,6 @@ struct write_stats {
// forwarded by a coordinator to a replica
uint64_t reads_coordinator_outside_replica_set = 0;
uint64_t background_writes = 0; // client no longer waits for the write
uint64_t background_write_bytes = 0;
uint64_t queued_write_bytes = 0;
uint64_t throttled_writes = 0; // total number of writes ever delayed due to throttling
uint64_t throttled_base_writes = 0; // current number of base writes delayed due to view update backlog
uint64_t background_writes_failed = 0;
@@ -122,11 +120,15 @@ public:
write_stats();
write_stats(const sstring& category, bool auto_register_stats);
void register_metrics_local();
void register_metrics_for(gms::inet_address ep);
void register_stats();
void register_split_metrics_local();
void register_split_metrics_for(gms::inet_address ep);
protected:
seastar::metrics::metric_groups _metrics;
};
struct stats : public write_stats {
seastar::metrics::metric_groups _metrics;
utils::timed_rate_moving_average read_timeouts;
utils::timed_rate_moving_average read_unavailables;
utils::timed_rate_moving_average range_slice_timeouts;
@@ -192,9 +194,31 @@ struct stats : public write_stats {
public:
stats();
void register_stats();
void register_split_metrics_local();
void register_split_metrics_for(gms::inet_address ep);
};
void register_metrics_local();
void register_metrics_for(gms::inet_address ep);
/*** This struct represents stats that has meaning (only or also)
* globally. For example background_write_bytes are used to decide
* if to throttle requests and it make little sense to check it
* per scheduling group, on the other hand this statistic has value
* in figuring out how much load each scheduling group generates
* on the system, this statistic should be handled elsewhere, i.e:
* in the write_stats struct.
*/
struct global_write_stats {
seastar::metrics::metric_groups _metrics;
uint64_t background_write_bytes = 0;
uint64_t queued_write_bytes = 0;
void register_stats();
};
/***
* Following the convention of stats and write_stats
*/
struct global_stats : public global_write_stats {
void register_stats();
};
}

View File

@@ -44,6 +44,9 @@
#include <boost/range/adaptor/map.hpp>
static logging::logger tlogger("table");
static seastar::metrics::label column_family_label("cf");
static seastar::metrics::label keyspace_label("ks");
using namespace std::chrono_literals;
@@ -1087,8 +1090,6 @@ table::reshuffle_sstables(std::set<int64_t> all_generations, int64_t start) {
});
}
seastar::metrics::label column_family_label("cf");
seastar::metrics::label keyspace_label("ks");
void table::set_metrics() {
auto cf = column_family_label(_schema->cf_name());
auto ks = keyspace_label(_schema->ks_name());
@@ -1123,13 +1124,7 @@ void table::set_metrics() {
// View metrics are created only for base tables, so there's no point in adding them to views (which cannot act as base tables for other views)
if (!_schema->is_view()) {
_metrics.add_group("column_family", {
ms::make_total_operations("view_updates_pushed_remote", _view_stats.view_updates_pushed_remote, ms::description("Number of updates (mutations) pushed to remote view replicas"))(cf)(ks),
ms::make_total_operations("view_updates_failed_remote", _view_stats.view_updates_failed_remote, ms::description("Number of updates (mutations) that failed to be pushed to remote view replicas"))(cf)(ks),
ms::make_total_operations("view_updates_pushed_local", _view_stats.view_updates_pushed_local, ms::description("Number of updates (mutations) pushed to local view replicas"))(cf)(ks),
ms::make_total_operations("view_updates_failed_local", _view_stats.view_updates_failed_local, ms::description("Number of updates (mutations) that failed to be pushed to local view replicas"))(cf)(ks),
ms::make_gauge("view_updates_pending", ms::description("Number of updates pushed to view and are still to be completed"), _view_stats.writes)(cf)(ks),
});
_view_stats.register_stats();
}
if (_schema->ks_name() != db::system_keyspace::NAME && _schema->ks_name() != db::schema_tables::v3::NAME && _schema->ks_name() != "system_traces") {
@@ -1570,7 +1565,10 @@ table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_man
cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker)
: _schema(std::move(schema))
, _config(std::move(config))
, _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()))
, _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()),
keyspace_label(_schema->ks_name()),
column_family_label(_schema->cf_name())
)
, _memtables(_config.enable_disk_writes ? make_memtable_list() : make_memory_only_memtable_list())
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))

View File

@@ -34,6 +34,7 @@
#include <seastar/core/distributed.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/scheduling.hh>
#include "utils/UUID_gen.hh"
#include "service/migration_manager.hh"
#include "sstables/compaction_manager.hh"
@@ -49,7 +50,6 @@
#include "db/view/view_builder.hh"
#include "db/view/node_view_update_backlog.hh"
#include "distributed_loader.hh"
// TODO: remove (#293)
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
@@ -442,7 +442,9 @@ public:
service::storage_proxy::config spcfg;
spcfg.available_memory = memory::stats().total_memory();
db::view::node_update_backlog b(smp::count, 10ms);
proxy.start(std::ref(*db), spcfg, std::ref(b)).get();
scheduling_group_key_config sg_conf =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
proxy.start(std::ref(*db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0()).get();
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });
mm.start(std::ref(*mm_notif)).get();