diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 7ed065172e..2f66c3db2e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3402,7 +3402,10 @@ protected: using targets_iterator = inet_address_vector_replica_set::iterator; using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; + // Clock type for measuring timeouts. using clock_type = storage_proxy::clock_type; + // Clock type for measuring latencies. + using latency_clock = utils::latency_counter::clock; schema_ptr _schema; shared_ptr _proxy; @@ -3499,13 +3502,14 @@ private: protected: future<> make_mutation_data_requests(lw_shared_ptr cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout] (gms::inet_address ep) { - return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep] (future>, cache_temperature>> f) { + return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { + return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_mutate_data(ep, std::get<0>(std::move(v))); ++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(ep); + register_request_latency(latency_clock::now() - start); } catch(...) { ++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -3514,14 +3518,15 @@ protected: }); } future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest] (gms::inet_address ep) { - return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep] (future>, cache_temperature>> f) { + return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest, start = latency_clock::now()] (gms::inet_address ep) { + return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_data(ep, std::get<0>(std::move(v))); ++_proxy->get_stats().data_read_completed.get_ep_stat(ep); _used_targets.push_back(ep); + register_request_latency(latency_clock::now() - start); } catch(...) { ++_proxy->get_stats().data_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -3530,14 +3535,15 @@ protected: }); } future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout] (gms::inet_address ep) { - return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep] (future> f) { + return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { + return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start] (future> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<2>(v)); resolver->add_digest(ep, std::get<0>(v), std::get<1>(v)); ++_proxy->get_stats().digest_read_completed.get_ep_stat(ep); _used_targets.push_back(ep); + register_request_latency(latency_clock::now() - start); } catch(...) { ++_proxy->get_stats().digest_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -3729,6 +3735,23 @@ public: lw_shared_ptr& get_cf() { return _cf; } + + // Maximum latency of a successful request made to a replica (over all requests that finished up to this point). + // Example usage: gathering latency statistics for deciding on invoking speculative retries. + std::optional max_request_latency() const { + if (_max_request_latency == NO_LATENCY) { + return std::nullopt; + } + return _max_request_latency; + } + +private: + void register_request_latency(latency_clock::duration d) { + _max_request_latency = std::max(_max_request_latency, d); + } + + static constexpr latency_clock::duration NO_LATENCY{-1}; + latency_clock::duration _max_request_latency{NO_LATENCY}; }; class never_speculating_read_executor : public abstract_read_executor { @@ -3994,16 +4017,17 @@ storage_proxy::query_singular(lw_shared_ptr cmd, std::pair<::shared_ptr, dht::token_range>& executor_and_token_range) { auto& rex = std::get<0>(executor_and_token_range); auto& token_range = std::get<1>(executor_and_token_range); - utils::latency_counter lc; - lc.start(); - return rex->execute(timeout).then_wrapped([p = std::move(p), lc, rex, used_replicas, token_range = std::move(token_range), tmptr] ( + return rex->execute(timeout).then_wrapped([p = std::move(p), rex, used_replicas, token_range = std::move(token_range), tmptr] ( future>> f) mutable { if (!f.failed()) { used_replicas->emplace(std::move(token_range), endpoints_to_replica_ids(*tmptr, rex->used_targets())); + + auto latency = rex->max_request_latency(); + if (latency) { + rex->get_cf()->add_coordinator_read_latency(*latency); + } } - if (lc.is_start()) { - rex->get_cf()->add_coordinator_read_latency(lc.stop().latency()); - } + return std::move(f); }); }, std::move(merger));