diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 980ede3e77..56b96ee0d0 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -421,30 +421,33 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(key return register_response_handler(std::move(h)); } -storage_proxy::split_stats::split_stats(const sstring& category, const sstring& description_prefix) - : _description_prefix(description_prefix) +storage_proxy::split_stats::split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix) + : _short_description_prefix(short_description_prefix) + , _long_description_prefix(long_description_prefix) , _category(category) { // register a local Node counter to begin with... - _collectd_regs.push_back( - scollectd::add_polled_metric(scollectd::type_instance_id(_category - , scollectd::per_cpu_plugin_instance - , "total_operations", _description_prefix + sstring(" (local Node)")) - , scollectd::make_typed(scollectd::data_type::DERIVE, _local.val))); + namespace sm = seastar::metrics; + + _metrics = sm::create_metric_groups(); + _metrics->add_group(_category, { + sm::make_derive("total_operations", _short_description_prefix + sstring(" (local Node)"), [this] { return _local.val; }, + sm::description(_long_description_prefix + "on a local Node")) + }); } storage_proxy::stats::stats() - : writes_attempts(COORDINATOR_STATS_CATEGORY, "total write attempts") - , writes_errors(COORDINATOR_STATS_CATEGORY, "write errors") - , read_repair_write_attempts(COORDINATOR_STATS_CATEGORY, "read repair write attempts") - , data_read_attempts(COORDINATOR_STATS_CATEGORY, "data reads") - , data_read_completed(COORDINATOR_STATS_CATEGORY, "completed data reads") - , data_read_errors(COORDINATOR_STATS_CATEGORY, "data read errors") - , digest_read_attempts(COORDINATOR_STATS_CATEGORY, "digest reads") - , digest_read_completed(COORDINATOR_STATS_CATEGORY, "completed digest reads") - , digest_read_errors(COORDINATOR_STATS_CATEGORY, "digest read errors") - , mutation_data_read_attempts(COORDINATOR_STATS_CATEGORY, "mutation data reads") - , mutation_data_read_completed(COORDINATOR_STATS_CATEGORY, "completed mutation data reads") - , mutation_data_read_errors(COORDINATOR_STATS_CATEGORY, "mutation data read errors") {} + : writes_attempts(COORDINATOR_STATS_CATEGORY, "total write attempts", "total number of write requests") + , writes_errors(COORDINATOR_STATS_CATEGORY, "write errors", "number of write requests that failed") + , read_repair_write_attempts(COORDINATOR_STATS_CATEGORY, "read repair write attempts", "number of write operations in a read repair context") + , data_read_attempts(COORDINATOR_STATS_CATEGORY, "data reads", "number of data read requests") + , data_read_completed(COORDINATOR_STATS_CATEGORY, "completed data reads", "number of data read requests that completed") + , data_read_errors(COORDINATOR_STATS_CATEGORY, "data read errors", "number of data read requests that failed") + , digest_read_attempts(COORDINATOR_STATS_CATEGORY, "digest reads", "number of digest read requests") + , digest_read_completed(COORDINATOR_STATS_CATEGORY, "completed digest reads", "number of digest read requests that completed") + , digest_read_errors(COORDINATOR_STATS_CATEGORY, "digest read errors", "number of digest read requests that failed") + , mutation_data_read_attempts(COORDINATOR_STATS_CATEGORY, "mutation data reads", "number of mutation data read requests") + , mutation_data_read_completed(COORDINATOR_STATS_CATEGORY, "completed mutation data reads", "number of mutation data read requests that completed") + , mutation_data_read_errors(COORDINATOR_STATS_CATEGORY, "mutation data read errors", "number of mutation data read requests that failed") {} inline uint64_t& storage_proxy::split_stats::get_ep_stat(gms::inet_address ep) { if (is_me(ep)) { @@ -456,115 +459,81 @@ inline uint64_t& storage_proxy::split_stats::get_ep_stat(gms::inet_address ep) { // if this is the first time we see an endpoint from this DC - add a // corresponding collectd metric if (_dc_stats.find(dc) == _dc_stats.end()) { - _collectd_regs.push_back( - scollectd::add_polled_metric(scollectd::type_instance_id(_category - , scollectd::per_cpu_plugin_instance - , "total_operations", _description_prefix + sstring(" (external Nodes in DC: ") + dc + sstring(")")) - , scollectd::make_typed(scollectd::data_type::DERIVE, [this, dc] { return _dc_stats[dc].val; }) - )); + namespace sm = seastar::metrics; + + _metrics->add_group(_category, { + sm::make_derive("total_operations", seastar::format("{} (external Nodes in DC: {})", _short_description_prefix, dc), [this, dc] { return _dc_stats[dc].val; }, + sm::description(seastar::format("{} when communicating with external Nodes in DC {}", _long_description_prefix, dc))) + }); } return _dc_stats[dc].val; } storage_proxy::~storage_proxy() {} storage_proxy::storage_proxy(distributed& db) : _db(db) { - _collectd_registrations = std::make_unique(scollectd::registrations({ - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "queue_length", "foreground writes") - , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _stats.writes - _stats.background_writes; }) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "queue_length", "background writes") - , scollectd::make_typed(scollectd::data_type::GAUGE, _stats.background_writes) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "queue_length", "throttled writes") - , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _throttled_writes.size(); }) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "throttled writes") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.throttled_writes) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "bytes", "queued write bytes") - , scollectd::make_typed(scollectd::data_type::GAUGE, _stats.queued_write_bytes) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "bytes", "background write bytes") - , scollectd::make_typed(scollectd::data_type::GAUGE, _stats.background_write_bytes) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "queue_length", "reads") - , scollectd::make_typed(scollectd::data_type::GAUGE, _stats.reads) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "queue_length", "background reads") - , scollectd::make_typed(scollectd::data_type::GAUGE, _stats.background_reads) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "read retries") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.read_retries) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "global_read_repairs_canceled_due_to_concurrent_write") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.global_read_repairs_canceled_due_to_concurrent_write) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "write timeouts") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.write_timeouts._count) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "write unavailable") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.write_unavailables._count) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "read timeouts") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.read_timeouts._count) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "read unavailable") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.read_unavailables._count) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "range slice timeouts") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.range_slice_timeouts._count) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(COORDINATOR_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "range slice unavailable") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.range_slice_unavailables._count) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(REPLICA_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "received mutations") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.received_mutations) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(REPLICA_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "forwarded mutations") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.forwarded_mutations) - ), - scollectd::add_polled_metric(scollectd::type_instance_id(REPLICA_STATS_CATEGORY - , scollectd::per_cpu_plugin_instance - , "total_operations", "forwarding errors") - , scollectd::make_typed(scollectd::data_type::DERIVE, _stats.forwarding_errors) - ), - })); + namespace sm = seastar::metrics; + + _metrics = sm::create_metric_groups(); + _metrics->add_group(COORDINATOR_STATS_CATEGORY, { + sm::make_gauge("queue_length", "foreground writes", [this] { return _stats.writes - _stats.background_writes; }, + sm::description("number of currently pending forground write requests")), + + sm::make_gauge("queue_length", "background writes", [this] { return _stats.background_writes; }, + sm::description("number of currently pending background write requests")), + + sm::make_gauge("queue_length", "throttled writes", [this] { return _throttled_writes.size(); }, + sm::description("number of currently throttled write requests")), + + sm::make_derive("total_operations", "throttled writes", [this] { return _stats.throttled_writes; }, + sm::description("number of throttled write requests")), + + sm::make_gauge("bytes", "queued write bytes", [this] { return _stats.queued_write_bytes; }, + sm::description("number of bytes in pending write requests")), + + sm::make_gauge("bytes", "background write bytes", [this] { return _stats.background_write_bytes; }, + sm::description("number of bytes in pending background write requests")), + + sm::make_gauge("queue_length", "reads", [this] { return _stats.reads; }, + sm::description("number of currently pending read requests")), + + sm::make_gauge("queue_length", "background reads", [this] { return _stats.background_reads; }, + sm::description("number of currently pending background read requests")), + + sm::make_derive("total_operations", "read retries", [this] { return _stats.read_retries; }, + sm::description("number of read retry attempts")), + + sm::make_derive("total_operations", "canceled read repairs", [this] { return _stats.global_read_repairs_canceled_due_to_concurrent_write; }, + sm::description("number of global read repairs canceled due to a concurrent write")), + + sm::make_derive("total_operations", "write timeouts", [this] { return _stats.write_timeouts._count; }, + sm::description("number of write request failed due to a timeout")), + + sm::make_derive("total_operations", "write unavailable", [this] { return _stats.write_unavailables._count; }, + sm::description("number write requests failed due to an \"unavailable\" error")), + + sm::make_derive("total_operations", "read timeouts", [this] { return _stats.read_timeouts._count; }, + sm::description("number of read request failed due to a timeout")), + + sm::make_derive("total_operations", "read unavailable", [this] { return _stats.read_unavailables._count; }, + sm::description("number read requests failed due to an \"unavailable\" error")), + + sm::make_derive("total_operations", "range timeouts", [this] { return _stats.range_slice_timeouts._count; }, + sm::description("number of range read operations failed due to a timeout")), + + sm::make_derive("total_operations", "range unavailable", [this] { return _stats.range_slice_unavailables._count; }, + sm::description("number of range read operations failed due to an \"unavailable\" error")), + }); + + _metrics->add_group(REPLICA_STATS_CATEGORY, { + sm::make_derive("total_operations", "received mutations", _stats.received_mutations, + sm::description("number of mutations received by a replica Node")), + + sm::make_derive("total_operations", "forwarded mutations", _stats.forwarded_mutations, + sm::description("number of mutations forwarded to other replica Nodes")), + + sm::make_derive("total_operations", "forwarding errors", _stats.forwarding_errors, + sm::description("number of errors during forwarding mutations to other replica Nodes")), + }); } storage_proxy::rh_entry::rh_entry(shared_ptr&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index c23c5e3036..662152c97e 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -51,6 +51,7 @@ #include "utils/histogram.hh" #include "utils/estimated_histogram.hh" #include "tracing/trace_state.hh" +#include namespace compat { @@ -102,18 +103,20 @@ public: // counters of operations performed on external Nodes aggregated per Nodes' DCs std::unordered_map _dc_stats; // collectd registrations container - std::vector _collectd_regs; + seastar::metrics::metric_groups _metrics; // a prefix string that will be used for a collecd counters' description - sstring _description_prefix; + sstring _short_description_prefix; + sstring _long_description_prefix; // a statistics category, e.g. "client" or "replica" sstring _category; public: /** * @param category a statistics category, e.g. "client" or "replica" - * @param description_prefix a collectd description prefix + * @param short_description_prefix a short description prefix + * @param long_description_prefix a long description prefix */ - split_stats(const sstring& category, const sstring& description_prefix); + split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix); /** * Get a reference to the statistics counter corresponding to the given @@ -206,7 +209,7 @@ private: // for read repair chance calculation std::default_random_engine _urandom; std::uniform_real_distribution<> _read_repair_chance = std::uniform_real_distribution<>(0,1); - std::unique_ptr _collectd_registrations; + seastar::metrics::metric_groups _metrics; private: void uninit_messaging_service(); future>> query_singular(lw_shared_ptr cmd, std::vector&& partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state);