service::storage_proxy: rework the collectd counters registration

Use the new seastar's metrics_registration framework:
   - Change the registration syntax.
   - Add a long description for each counter.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
This commit is contained in:
Vlad Zolotarov
2016-12-01 22:32:38 -05:00
parent 3bf12e4ffc
commit e5e7ac1bd4
2 changed files with 99 additions and 127 deletions

View File

@@ -421,30 +421,33 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(key
return register_response_handler(std::move(h));
}
storage_proxy::split_stats::split_stats(const sstring& category, const sstring& description_prefix)
: _description_prefix(description_prefix)
storage_proxy::split_stats::split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix)
: _short_description_prefix(short_description_prefix)
, _long_description_prefix(long_description_prefix)
, _category(category) {
// register a local Node counter to begin with...
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(_category
, scollectd::per_cpu_plugin_instance
, "total_operations", _description_prefix + sstring(" (local Node)"))
, scollectd::make_typed(scollectd::data_type::DERIVE, _local.val)));
namespace sm = seastar::metrics;
_metrics = sm::create_metric_groups();
_metrics->add_group(_category, {
sm::make_derive("total_operations", _short_description_prefix + sstring(" (local Node)"), [this] { return _local.val; },
sm::description(_long_description_prefix + "on a local Node"))
});
}
storage_proxy::stats::stats()
: writes_attempts(COORDINATOR_STATS_CATEGORY, "total write attempts")
, writes_errors(COORDINATOR_STATS_CATEGORY, "write errors")
, read_repair_write_attempts(COORDINATOR_STATS_CATEGORY, "read repair write attempts")
, data_read_attempts(COORDINATOR_STATS_CATEGORY, "data reads")
, data_read_completed(COORDINATOR_STATS_CATEGORY, "completed data reads")
, data_read_errors(COORDINATOR_STATS_CATEGORY, "data read errors")
, digest_read_attempts(COORDINATOR_STATS_CATEGORY, "digest reads")
, digest_read_completed(COORDINATOR_STATS_CATEGORY, "completed digest reads")
, digest_read_errors(COORDINATOR_STATS_CATEGORY, "digest read errors")
, mutation_data_read_attempts(COORDINATOR_STATS_CATEGORY, "mutation data reads")
, mutation_data_read_completed(COORDINATOR_STATS_CATEGORY, "completed mutation data reads")
, mutation_data_read_errors(COORDINATOR_STATS_CATEGORY, "mutation data read errors") {}
: writes_attempts(COORDINATOR_STATS_CATEGORY, "total write attempts", "total number of write requests")
, writes_errors(COORDINATOR_STATS_CATEGORY, "write errors", "number of write requests that failed")
, read_repair_write_attempts(COORDINATOR_STATS_CATEGORY, "read repair write attempts", "number of write operations in a read repair context")
, data_read_attempts(COORDINATOR_STATS_CATEGORY, "data reads", "number of data read requests")
, data_read_completed(COORDINATOR_STATS_CATEGORY, "completed data reads", "number of data read requests that completed")
, data_read_errors(COORDINATOR_STATS_CATEGORY, "data read errors", "number of data read requests that failed")
, digest_read_attempts(COORDINATOR_STATS_CATEGORY, "digest reads", "number of digest read requests")
, digest_read_completed(COORDINATOR_STATS_CATEGORY, "completed digest reads", "number of digest read requests that completed")
, digest_read_errors(COORDINATOR_STATS_CATEGORY, "digest read errors", "number of digest read requests that failed")
, mutation_data_read_attempts(COORDINATOR_STATS_CATEGORY, "mutation data reads", "number of mutation data read requests")
, mutation_data_read_completed(COORDINATOR_STATS_CATEGORY, "completed mutation data reads", "number of mutation data read requests that completed")
, mutation_data_read_errors(COORDINATOR_STATS_CATEGORY, "mutation data read errors", "number of mutation data read requests that failed") {}
inline uint64_t& storage_proxy::split_stats::get_ep_stat(gms::inet_address ep) {
if (is_me(ep)) {
@@ -456,115 +459,81 @@ inline uint64_t& storage_proxy::split_stats::get_ep_stat(gms::inet_address ep) {
// if this is the first time we see an endpoint from this DC - add a
// corresponding collectd metric
if (_dc_stats.find(dc) == _dc_stats.end()) {
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(_category
, scollectd::per_cpu_plugin_instance
, "total_operations", _description_prefix + sstring(" (external Nodes in DC: ") + dc + sstring(")"))
, scollectd::make_typed(scollectd::data_type::DERIVE, [this, dc] { return _dc_stats[dc].val; })
));
namespace sm = seastar::metrics;
_metrics->add_group(_category, {
sm::make_derive("total_operations", seastar::format("{} (external Nodes in DC: {})", _short_description_prefix, dc), [this, dc] { return _dc_stats[dc].val; },
sm::description(seastar::format("{} when communicating with external Nodes in DC {}", _long_description_prefix, dc)))
});
}
return _dc_stats[dc].val;
}
storage_proxy::~storage_proxy() {}
storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
_collectd_registrations = std::make_unique<scollectd::registrations>(scollectd::registrations({
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "queue_length", "foreground writes")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _stats.writes - _stats.background_writes; })
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "queue_length", "background writes")
, scollectd::make_typed(scollectd::data_type::GAUGE, _stats.background_writes)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "queue_length", "throttled writes")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _throttled_writes.size(); })
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "throttled writes")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.throttled_writes)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "bytes", "queued write bytes")
, scollectd::make_typed(scollectd::data_type::GAUGE, _stats.queued_write_bytes)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "bytes", "background write bytes")
, scollectd::make_typed(scollectd::data_type::GAUGE, _stats.background_write_bytes)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "queue_length", "reads")
, scollectd::make_typed(scollectd::data_type::GAUGE, _stats.reads)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "queue_length", "background reads")
, scollectd::make_typed(scollectd::data_type::GAUGE, _stats.background_reads)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "read retries")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.read_retries)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "global_read_repairs_canceled_due_to_concurrent_write")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.global_read_repairs_canceled_due_to_concurrent_write)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "write timeouts")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.write_timeouts._count)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "write unavailable")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.write_unavailables._count)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "read timeouts")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.read_timeouts._count)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "read unavailable")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.read_unavailables._count)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "range slice timeouts")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.range_slice_timeouts._count)
),
scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "range slice unavailable")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.range_slice_unavailables._count)
),
scollectd::add_polled_metric(scollectd::type_instance_id(REPLICA_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "received mutations")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.received_mutations)
),
scollectd::add_polled_metric(scollectd::type_instance_id(REPLICA_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "forwarded mutations")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.forwarded_mutations)
),
scollectd::add_polled_metric(scollectd::type_instance_id(REPLICA_STATS_CATEGORY
, scollectd::per_cpu_plugin_instance
, "total_operations", "forwarding errors")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats.forwarding_errors)
),
}));
namespace sm = seastar::metrics;
_metrics = sm::create_metric_groups();
_metrics->add_group(COORDINATOR_STATS_CATEGORY, {
sm::make_gauge("queue_length", "foreground writes", [this] { return _stats.writes - _stats.background_writes; },
sm::description("number of currently pending forground write requests")),
sm::make_gauge("queue_length", "background writes", [this] { return _stats.background_writes; },
sm::description("number of currently pending background write requests")),
sm::make_gauge("queue_length", "throttled writes", [this] { return _throttled_writes.size(); },
sm::description("number of currently throttled write requests")),
sm::make_derive("total_operations", "throttled writes", [this] { return _stats.throttled_writes; },
sm::description("number of throttled write requests")),
sm::make_gauge("bytes", "queued write bytes", [this] { return _stats.queued_write_bytes; },
sm::description("number of bytes in pending write requests")),
sm::make_gauge("bytes", "background write bytes", [this] { return _stats.background_write_bytes; },
sm::description("number of bytes in pending background write requests")),
sm::make_gauge("queue_length", "reads", [this] { return _stats.reads; },
sm::description("number of currently pending read requests")),
sm::make_gauge("queue_length", "background reads", [this] { return _stats.background_reads; },
sm::description("number of currently pending background read requests")),
sm::make_derive("total_operations", "read retries", [this] { return _stats.read_retries; },
sm::description("number of read retry attempts")),
sm::make_derive("total_operations", "canceled read repairs", [this] { return _stats.global_read_repairs_canceled_due_to_concurrent_write; },
sm::description("number of global read repairs canceled due to a concurrent write")),
sm::make_derive("total_operations", "write timeouts", [this] { return _stats.write_timeouts._count; },
sm::description("number of write request failed due to a timeout")),
sm::make_derive("total_operations", "write unavailable", [this] { return _stats.write_unavailables._count; },
sm::description("number write requests failed due to an \"unavailable\" error")),
sm::make_derive("total_operations", "read timeouts", [this] { return _stats.read_timeouts._count; },
sm::description("number of read request failed due to a timeout")),
sm::make_derive("total_operations", "read unavailable", [this] { return _stats.read_unavailables._count; },
sm::description("number read requests failed due to an \"unavailable\" error")),
sm::make_derive("total_operations", "range timeouts", [this] { return _stats.range_slice_timeouts._count; },
sm::description("number of range read operations failed due to a timeout")),
sm::make_derive("total_operations", "range unavailable", [this] { return _stats.range_slice_unavailables._count; },
sm::description("number of range read operations failed due to an \"unavailable\" error")),
});
_metrics->add_group(REPLICA_STATS_CATEGORY, {
sm::make_derive("total_operations", "received mutations", _stats.received_mutations,
sm::description("number of mutations received by a replica Node")),
sm::make_derive("total_operations", "forwarded mutations", _stats.forwarded_mutations,
sm::description("number of mutations forwarded to other replica Nodes")),
sm::make_derive("total_operations", "forwarding errors", _stats.forwarding_errors,
sm::description("number of errors during forwarding mutations to other replica Nodes")),
});
}
storage_proxy::rh_entry::rh_entry(shared_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {}

View File

@@ -51,6 +51,7 @@
#include "utils/histogram.hh"
#include "utils/estimated_histogram.hh"
#include "tracing/trace_state.hh"
#include <seastar/core/metrics_registration.hh>
namespace compat {
@@ -102,18 +103,20 @@ public:
// counters of operations performed on external Nodes aggregated per Nodes' DCs
std::unordered_map<sstring, stats_counter> _dc_stats;
// collectd registrations container
std::vector<scollectd::registration> _collectd_regs;
seastar::metrics::metric_groups _metrics;
// a prefix string that will be used for a collecd counters' description
sstring _description_prefix;
sstring _short_description_prefix;
sstring _long_description_prefix;
// a statistics category, e.g. "client" or "replica"
sstring _category;
public:
/**
* @param category a statistics category, e.g. "client" or "replica"
* @param description_prefix a collectd description prefix
* @param short_description_prefix a short description prefix
* @param long_description_prefix a long description prefix
*/
split_stats(const sstring& category, const sstring& description_prefix);
split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix);
/**
* Get a reference to the statistics counter corresponding to the given
@@ -206,7 +209,7 @@ private:
// for read repair chance calculation
std::default_random_engine _urandom;
std::uniform_real_distribution<> _read_repair_chance = std::uniform_real_distribution<>(0,1);
std::unique_ptr<scollectd::registrations> _collectd_registrations;
seastar::metrics::metric_groups _metrics;
private:
void uninit_messaging_service();
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state);