From 9e85921006804bbd3aadd5b1ea49a5ae27c28fa1 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 1 Jun 2021 17:46:29 +0200 Subject: [PATCH] storage_proxy: remove a feedback loop from the speculative retry latency metric To handle a read request from a client, the coordinator node must send data and digest requests to replicas, reconcile the obtained results (by merging the obtained mutations and comparing digests), and possibly send more requests to replicas if the digests turned out to be different in order to perform read repair and preserve consistency of observed reads. In contrast to writes, where coordinators send their mutation write requests to all replicas in the replica set, for reads the coordinators send their requests only to as many replicas as is required to achieve the desired CL. For example consider RF=3 and a CL=QUORUM read. Then the coordinator sends its request to a subset of 2 nodes out of the 3 possible replicas. The choice of the 2-node subset is random; the distribution used for the random roll is affected by certain things such as the "cache hitrate" metric. The details are not that relevant for this discussion. If not all of the the initially chosen replicas answer within a certain time period, the coordinator may send an additional request to one more replica, hoping that this replica helps achieving the desired CL so the entire client request succeeds. This mechanism is called "speculative retry" and is enabled by default. This time period - call it `T` - is chosen based on keyspace configuration. The default value is "99.0PERCENTILE", which means that `T` is roughly equal to the 99th percentile of the latency distribution of previous requests (or at least the most recent requests; the algorithm uses an exponential decay strategy to make old request less relevant for the metric). The latencies used are the durations of whole coordinator read requests: each such duration measurement starts before the first replica request is sent and ends after the last replica request is answered, among the replica requests whose results were used for the reconciled result returned to the client (there may be more requests sent later "in the background" - they don't affect the client result and are not taken into account for the latency measurement). This strategy, however, gives an undesired effect which appears when a significant part of all requests require a speculative retry to succeed. To explain this effect it's best to consider a scenario which takes this to the extreme - where *all* requests require a speculative retry. Consider RF=3 and CL=QUORUM so each read request initially uses 2 replicas. Let {A, B, C} be the set of replicas. We run a uniformly distributed read workload. Initially the cluster operates normally. Roughly 1/3 of all requests go to replicas {A, B}, 1/3 go to {A, C}, and 1/3 go to {B, C}. The 99th percentile of read request latencies is 50ms. Suppose that the average round-trip latency between a coordinator and any replica is 10ms. Suddenly replica C is hard-killed: non-graceful shutdown, e.g. power outage. This means that other nodes are initially not aware that C is down, they must wait for the failure detector to convict C as unavailable which happens after a configurable amount of time. The current default is 20s, meaning that by default coordinators will still attempt to send requests to C for 20s after it is hard-killed. During this period the following happens: - About 2/3 of all requests - the ones which were routed to {A, C} and {B, C} - do not finish within 50ms because C does not answer. For these requests to finish, the coordinator performs a speculative retry to the third replica which finishes after ~10ms (the average round-trip latency). Thus the entire request, from the coordinator's POV, takes ~60ms. - Eventually (very quickly in fact - assuming there are many concurrent requests) the P99 latency rises to 60ms. - Furthermore, the requests which initially use {A, C} and {B, C} start taking more than 2/3 of all requests because they are stuck in the foreground longer than the {A, B} requests (since their latencies are higher). - These requests do not finish within 60ms. Thus coordinators perform speculative retries. Thus they finish after ~70ms. - Eventually the P99 latency rises to 70ms. - These bad requests take an even longer portion of all requests. - These requests do not finish within 70ms. They finish after ~80ms. - Eventually the P99 latency rises to 80ms. - And so on. In metrics, we observe the following: - Latencies rise roughly linearly. They rise until they hit a certain limit; this limit comes from the fact that `T` is upper-bounded by the read request timeout parameter divided by 2. Thus if the read request timeout is `5s` and P99 latencies are `3s`, `T` will be `2.5s`, not `3s`. Thus eventually all requests will take about `2.5s + 10ms` to finish (`2.5s` until speculative retry happens, `10ms` for the last round-trip), unless the node is marked as DOWN before we reach that limit. - Throughput decreases roughly proportionally to the y = 1/x function, as expected from Little's law. Everything goes back to normal when nodes mark C as DOWN, which happens after ~20s by default as explained above. Then coordinators start routing all requests to {A, B} only. This does not happen for graceful shutdowns, where C announces to the cluster that it's shutting down before shutting down, causing other nodes to mark it as DOWN almost immediately. The root cause of the issue is a feedback loop in the metric used to calculate `T`: we perform a speculative retry after `T` -> P99 request latencies rise above `T + 10ms` -> `T` rises above `T + 10ms` -> etc. We fix the problem by changing the measurements used for calculating `T`. Instead of measuring the entire coordinator read latency, we measure each replica request separately and take the maximum over these measurements. We only take into account the measurements for requests that actually contributed to the request's result. The previous statistic would also measure failed requests latencies. Now we measure only latencies of successful replica requests. Indeed this makes sense for the speculative retry use case; the idea behind speculative retry is that we assume that requests usually succeed within a certain time period, and we should perform the retry if they take longer than that. To measure this time period, taking failed requests into account doesn't make much sense. In the scenario above, for a request that initially goes to {A, C}, the following would happen after applying the fix: - We send the requests to A and C. - After ~10ms A responds. We record the ~10ms measurement. - After ~50ms we perform speculative retry, sending a request to B. - After ~10ms B responds. We record the ~10ms measurement. The maximum over recorded measurements is ~10ms, not ~60ms. The feedback loop is removed. Experiments show that the solution is effective: in scenarios like above, after C is killed, latencies only rise slightly by a constant amount and then maintain their level, as expected. Throughput also drops by a constant amount and maintains its level instead of continuously dropping with an asymptote at 0. Fixes #3746. Fixes #7342. Closes #8783 --- service/storage_proxy.cc | 48 ++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 7ed065172e..2f66c3db2e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3402,7 +3402,10 @@ protected: using targets_iterator = inet_address_vector_replica_set::iterator; using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; + // Clock type for measuring timeouts. using clock_type = storage_proxy::clock_type; + // Clock type for measuring latencies. + using latency_clock = utils::latency_counter::clock; schema_ptr _schema; shared_ptr _proxy; @@ -3499,13 +3502,14 @@ private: protected: future<> make_mutation_data_requests(lw_shared_ptr cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout] (gms::inet_address ep) { - return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep] (future>, cache_temperature>> f) { + return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { + return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_mutate_data(ep, std::get<0>(std::move(v))); ++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(ep); + register_request_latency(latency_clock::now() - start); } catch(...) { ++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -3514,14 +3518,15 @@ protected: }); } future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest] (gms::inet_address ep) { - return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep] (future>, cache_temperature>> f) { + return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest, start = latency_clock::now()] (gms::inet_address ep) { + return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_data(ep, std::get<0>(std::move(v))); ++_proxy->get_stats().data_read_completed.get_ep_stat(ep); _used_targets.push_back(ep); + register_request_latency(latency_clock::now() - start); } catch(...) { ++_proxy->get_stats().data_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -3530,14 +3535,15 @@ protected: }); } future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout] (gms::inet_address ep) { - return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep] (future> f) { + return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { + return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start] (future> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<2>(v)); resolver->add_digest(ep, std::get<0>(v), std::get<1>(v)); ++_proxy->get_stats().digest_read_completed.get_ep_stat(ep); _used_targets.push_back(ep); + register_request_latency(latency_clock::now() - start); } catch(...) { ++_proxy->get_stats().digest_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -3729,6 +3735,23 @@ public: lw_shared_ptr& get_cf() { return _cf; } + + // Maximum latency of a successful request made to a replica (over all requests that finished up to this point). + // Example usage: gathering latency statistics for deciding on invoking speculative retries. + std::optional max_request_latency() const { + if (_max_request_latency == NO_LATENCY) { + return std::nullopt; + } + return _max_request_latency; + } + +private: + void register_request_latency(latency_clock::duration d) { + _max_request_latency = std::max(_max_request_latency, d); + } + + static constexpr latency_clock::duration NO_LATENCY{-1}; + latency_clock::duration _max_request_latency{NO_LATENCY}; }; class never_speculating_read_executor : public abstract_read_executor { @@ -3994,16 +4017,17 @@ storage_proxy::query_singular(lw_shared_ptr cmd, std::pair<::shared_ptr, dht::token_range>& executor_and_token_range) { auto& rex = std::get<0>(executor_and_token_range); auto& token_range = std::get<1>(executor_and_token_range); - utils::latency_counter lc; - lc.start(); - return rex->execute(timeout).then_wrapped([p = std::move(p), lc, rex, used_replicas, token_range = std::move(token_range), tmptr] ( + return rex->execute(timeout).then_wrapped([p = std::move(p), rex, used_replicas, token_range = std::move(token_range), tmptr] ( future>> f) mutable { if (!f.failed()) { used_replicas->emplace(std::move(token_range), endpoints_to_replica_ids(*tmptr, rex->used_targets())); + + auto latency = rex->max_request_latency(); + if (latency) { + rex->get_cf()->add_coordinator_read_latency(*latency); + } } - if (lc.is_start()) { - rex->get_cf()->add_coordinator_read_latency(lc.stop().latency()); - } + return std::move(f); }); }, std::move(merger));