metrics: introduce a metric for non-local reads

A read which arrived to a non-replica and had to be forwarded to a
replica by the coordinator is accounted in an own metric,
reads_coordinator_outside_replica_set.
Most often such read is produced by a driver which is unaware of
token distribution on the ring.

If a read was forwarded to another replica due to heat weighted
load balancing or query preference set by the user, it's not accounted
in the metric.

In case of a multi-partition read (a query using IN statement,
e.g. x in (1, 2, 3)), if any of the keys is read from a
non-local node the read is accounted as a non-local.
The rationale behind it is that if the user tries to be careful and send
IN queries only to the same vnode, they are rewarded with the counter
staying at zero, while if they send multi-partition IN queries without
any precautions, they will see the metric go up which gives them a
starting point for investigating performance problems.

Closes #4338
This commit is contained in:
Konstantin Osipov
2019-05-17 20:46:01 +03:00
parent da1d1b74da
commit 56f3bda4c7
3 changed files with 27 additions and 4 deletions

View File

@@ -820,6 +820,9 @@ storage_proxy::storage_proxy(distributed<database>& db, storage_proxy::config cf
sm::make_total_operations("writes_coordinator_outside_replica_set", _stats.writes_coordinator_outside_replica_set,
sm::description("number of CQL write requests which arrived to a non-replica and had to be forwarded to a replica")),
sm::make_total_operations("reads_coordinator_outside_replica_set", _stats.reads_coordinator_outside_replica_set,
sm::description("number of CQL read requests which arrived to a non-replica and had to be forwarded to a replica")),
sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); },
sm::description("number of currently throttled write requests")),
@@ -2711,13 +2714,20 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
db::consistency_level cl,
db::read_repair_decision repair_decision,
tracing::trace_state_ptr trace_state,
const std::vector<gms::inet_address>& preferred_endpoints) {
const std::vector<gms::inet_address>& preferred_endpoints,
bool& is_read_non_local) {
const dht::token& token = pr.start()->value().token();
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
speculative_retry::type retry_type = schema->speculative_retry().get_type();
gms::inet_address extra_replica;
std::vector<gms::inet_address> all_replicas = get_live_sorted_endpoints(ks, token);
// Check for a non-local read before heat-weighted load balancing
// reordering of endpoints happens. The local endpoint, if
// present, is always first in the list, as get_live_sorted_endpoints()
// orders the list by proximity to the local endpoint.
is_read_non_local |= all_replicas.front() != utils::fb_utilities::get_broadcast_address();
auto cf = _db.local().find_column_family(schema).shared_from_this();
std::vector<gms::inet_address> target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica,
@@ -2831,6 +2841,10 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
db::read_repair_decision repair_decision = query_options.read_repair_decision
? *query_options.read_repair_decision : new_read_repair_decision(*schema);
// Update reads_coordinator_outside_replica_set once per request,
// not once per partition.
bool is_read_non_local = false;
for (auto&& pr: partition_ranges) {
if (!pr.is_singular()) {
throw std::runtime_error("mixed singular and non singular range are not supported");
@@ -2841,8 +2855,13 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
const auto replicas = it == query_options.preferred_replicas.end()
? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(it->second);
exec.emplace_back(get_read_executor(cmd, schema, std::move(pr), cl, repair_decision, query_options.trace_state, replicas),
std::move(token_range));
auto read_executor = get_read_executor(cmd, schema, std::move(pr), cl, repair_decision,
query_options.trace_state, replicas, is_read_non_local);
exec.emplace_back(read_executor, std::move(token_range));
}
if (is_read_non_local) {
_stats.reads_coordinator_outside_replica_set++;
}
query::result_merger merger(cmd->row_limit, cmd->partition_limit);

View File

@@ -254,7 +254,8 @@ private:
db::consistency_level cl,
db::read_repair_decision repair_decision,
tracing::trace_state_ptr trace_state,
const std::vector<gms::inet_address>& preferred_endpoints);
const std::vector<gms::inet_address>& preferred_endpoints,
bool& is_bounced_read);
future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
query::result_options opts,
tracing::trace_state_ptr trace_state,

View File

@@ -96,6 +96,9 @@ struct write_stats {
// A CQL write query arrived to a non-replica node and was
// forwarded by a coordinator to a replica
uint64_t writes_coordinator_outside_replica_set = 0;
// A CQL read query arrived to a non-replica node and was
// forwarded by a coordinator to a replica
uint64_t reads_coordinator_outside_replica_set = 0;
uint64_t background_writes = 0; // client no longer waits for the write
uint64_t background_write_bytes = 0;
uint64_t queued_write_bytes = 0;