diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index c88f42a3bd..cb2d11748f 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -820,6 +820,9 @@ storage_proxy::storage_proxy(distributed& db, storage_proxy::config cf sm::make_total_operations("writes_coordinator_outside_replica_set", _stats.writes_coordinator_outside_replica_set, sm::description("number of CQL write requests which arrived to a non-replica and had to be forwarded to a replica")), + sm::make_total_operations("reads_coordinator_outside_replica_set", _stats.reads_coordinator_outside_replica_set, + sm::description("number of CQL read requests which arrived to a non-replica and had to be forwarded to a replica")), + sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); }, sm::description("number of currently throttled write requests")), @@ -2711,13 +2714,20 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s db::consistency_level cl, db::read_repair_decision repair_decision, tracing::trace_state_ptr trace_state, - const std::vector& preferred_endpoints) { + const std::vector& preferred_endpoints, + bool& is_read_non_local) { const dht::token& token = pr.start()->value().token(); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); speculative_retry::type retry_type = schema->speculative_retry().get_type(); gms::inet_address extra_replica; std::vector all_replicas = get_live_sorted_endpoints(ks, token); + // Check for a non-local read before heat-weighted load balancing + // reordering of endpoints happens. The local endpoint, if + // present, is always first in the list, as get_live_sorted_endpoints() + // orders the list by proximity to the local endpoint. + is_read_non_local |= all_replicas.front() != utils::fb_utilities::get_broadcast_address(); + auto cf = _db.local().find_column_family(schema).shared_from_this(); std::vector target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica, @@ -2831,6 +2841,10 @@ storage_proxy::query_singular(lw_shared_ptr cmd, db::read_repair_decision repair_decision = query_options.read_repair_decision ? *query_options.read_repair_decision : new_read_repair_decision(*schema); + // Update reads_coordinator_outside_replica_set once per request, + // not once per partition. + bool is_read_non_local = false; + for (auto&& pr: partition_ranges) { if (!pr.is_singular()) { throw std::runtime_error("mixed singular and non singular range are not supported"); @@ -2841,8 +2855,13 @@ storage_proxy::query_singular(lw_shared_ptr cmd, const auto replicas = it == query_options.preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(it->second); - exec.emplace_back(get_read_executor(cmd, schema, std::move(pr), cl, repair_decision, query_options.trace_state, replicas), - std::move(token_range)); + auto read_executor = get_read_executor(cmd, schema, std::move(pr), cl, repair_decision, + query_options.trace_state, replicas, is_read_non_local); + + exec.emplace_back(read_executor, std::move(token_range)); + } + if (is_read_non_local) { + _stats.reads_coordinator_outside_replica_set++; } query::result_merger merger(cmd->row_limit, cmd->partition_limit); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index dae6eef4f4..06c01a73a3 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -254,7 +254,8 @@ private: db::consistency_level cl, db::read_repair_decision repair_decision, tracing::trace_state_ptr trace_state, - const std::vector& preferred_endpoints); + const std::vector& preferred_endpoints, + bool& is_bounced_read); future>, cache_temperature> query_result_local(schema_ptr, lw_shared_ptr cmd, const dht::partition_range& pr, query::result_options opts, tracing::trace_state_ptr trace_state, diff --git a/service/storage_proxy_stats.hh b/service/storage_proxy_stats.hh index 91907d1964..904d590f72 100644 --- a/service/storage_proxy_stats.hh +++ b/service/storage_proxy_stats.hh @@ -96,6 +96,9 @@ struct write_stats { // A CQL write query arrived to a non-replica node and was // forwarded by a coordinator to a replica uint64_t writes_coordinator_outside_replica_set = 0; + // A CQL read query arrived to a non-replica node and was + // forwarded by a coordinator to a replica + uint64_t reads_coordinator_outside_replica_set = 0; uint64_t background_writes = 0; // client no longer waits for the write uint64_t background_write_bytes = 0; uint64_t queued_write_bytes = 0;