storage_proxy: remove a feedback loop from the speculative retry latency metric

To handle a read request from a client, the coordinator node must send
data and digest requests to replicas, reconcile the obtained results
(by merging the obtained mutations and comparing digests), and possibly
send more requests to replicas if the digests turned out to be different
in order to perform read repair and preserve consistency of observed reads.

In contrast to writes, where coordinators send their mutation write requests
to all replicas in the replica set, for reads the coordinators send
their requests only to as many replicas as is required to achieve
the desired CL.

For example consider RF=3 and a CL=QUORUM read. Then the coordinator sends
its request to a subset of 2 nodes out of the 3 possible replicas. The
choice of the 2-node subset is random; the distribution used for the
random roll is affected by certain things such as the "cache hitrate"
metric. The details are not that relevant for this discussion.

If not all of the the initially chosen replicas
answer within a certain time period, the coordinator may send an
additional request to one more replica, hoping that this replica helps
achieving the desired CL so the entire client request succeeds. This
mechanism is called "speculative retry" and is enabled by default.

This time period - call it `T` - is chosen based on keyspace
configuration. The default value is "99.0PERCENTILE", which means that
`T` is roughly equal to the 99th percentile of the latency distribution
of previous requests (or at least the most recent requests; the
algorithm uses an exponential decay strategy to make old request less
relevant for the metric). The latencies used are the durations of whole
coordinator read requests: each such duration measurement starts before
the first replica request is sent and ends after the last replica
request is answered, among the replica requests whose results were used
for the reconciled result returned to the client (there may be more
requests sent later "in the background" - they don't affect the client
result and are not taken into account for the latency measurement).

This strategy, however, gives an undesired effect which appears
when a significant part of all requests require a speculative retry to
succeed. To explain this effect it's best to consider a scenario which
takes this to the extreme - where *all* requests require a speculative retry.

Consider RF=3 and CL=QUORUM so each read request initially uses 2
replicas. Let {A, B, C} be the set of replicas. We run a uniformly
distributed read workload.

Initially the cluster operates normally. Roughly 1/3 of all requests go
to replicas {A, B}, 1/3 go to {A, C}, and 1/3 go to {B, C}. The 99th
percentile of read request latencies is 50ms. Suppose that the average
round-trip latency between a coordinator and any replica is 10ms.

Suddenly replica C is hard-killed: non-graceful shutdown, e.g. power
outage. This means that other nodes are initially not aware that C is down,
they must wait for the failure detector to convict C as unavailable
which happens after a configurable amount of time. The current default
is 20s, meaning that by default coordinators will still attempt to send
requests to C for 20s after it is hard-killed.

During this period the following happens:
- About 2/3 of all requests - the ones which were routed to {A, C} and
  {B, C} - do not finish within 50ms because C does not answer. For
  these requests to finish, the coordinator performs a speculative retry
  to the third replica which finishes after ~10ms (the average round-trip
  latency). Thus the entire request, from the coordinator's POV, takes ~60ms.
- Eventually (very quickly in fact - assuming there are many concurrent
  requests) the P99 latency rises to 60ms.
- Furthermore, the requests which initially use {A, C} and {B, C} start
  taking more than 2/3 of all requests because they are stuck in the foreground
  longer than the {A, B} requests (since their latencies are higher).
- These requests do not finish within 60ms. Thus coordinators perform
  speculative retries. Thus they finish after ~70ms.
- Eventually the P99 latency rises to 70ms.
- These bad requests take an even longer portion of all requests.
- These requests do not finish within 70ms. They finish after ~80ms.
- Eventually the P99 latency rises to 80ms.
- And so on.

In metrics, we observe the following:
- Latencies rise roughly linearly. They rise until they hit a certain limit;
  this limit comes from the fact that `T` is upper-bounded by the
  read request timeout parameter divided by 2. Thus if the read request
  timeout is `5s` and P99 latencies are `3s`, `T` will be `2.5s`, not `3s`.
  Thus eventually all requests will take about `2.5s + 10ms` to finish
  (`2.5s` until speculative retry happens, `10ms` for the last round-trip),
  unless the node is marked as DOWN before we reach that limit.
- Throughput decreases roughly proportionally to the y = 1/x function, as
  expected from Little's law.

Everything goes back to normal when nodes mark C as DOWN, which happens
after ~20s by default as explained above. Then coordinators start
routing all requests to {A, B} only.

This does not happen for graceful shutdowns, where C announces to the
cluster that it's shutting down before shutting down, causing other
nodes to mark it as DOWN almost immediately.

The root cause of the issue is a feedback loop in the metric used to
calculate `T`: we perform a speculative retry after `T` -> P99 request
latencies rise above `T + 10ms` -> `T` rises above `T + 10ms` -> etc.

We fix the problem by changing the measurements used for calculating
`T`. Instead of measuring the entire coordinator read latency, we
measure each replica request separately and take the maximum over these
measurements. We only take into account the measurements for requests
that actually contributed to the request's result.

The previous statistic would also measure failed requests latencies. Now we
measure only latencies of successful replica requests. Indeed this makes
sense for the speculative retry use case; the idea behind speculative retry
is that we assume that requests usually succeed within a certain time
period, and we should perform the retry if they take longer than that.
To measure this time period, taking failed requests into account doesn't
make much sense.

In the scenario above, for a request that initially goes to {A, C}, the
following would happen after applying the fix:
- We send the requests to A and C.
- After ~10ms A responds. We record the ~10ms measurement.
- After ~50ms we perform speculative retry, sending a request to B.
- After ~10ms B responds. We record the ~10ms measurement.

The maximum over recorded measurements is ~10ms, not ~60ms.
The feedback loop is removed.

Experiments show that the solution is effective: in scenarios like
above, after C is killed, latencies only rise slightly by a constant
amount and then maintain their level, as expected. Throughput also drops
by a constant amount and maintains its level instead of continuously
dropping with an asymptote at 0.

Fixes #3746.
Fixes #7342.

Closes #8783
This commit is contained in:
Kamil Braun
2021-06-01 17:46:29 +02:00
committed by Avi Kivity
parent d6f3a62c13
commit 9e85921006

View File

@@ -3402,7 +3402,10 @@ protected:
using targets_iterator = inet_address_vector_replica_set::iterator;
using digest_resolver_ptr = ::shared_ptr<digest_read_resolver>;
using data_resolver_ptr = ::shared_ptr<data_read_resolver>;
// Clock type for measuring timeouts.
using clock_type = storage_proxy::clock_type;
// Clock type for measuring latencies.
using latency_clock = utils::latency_counter::clock;
schema_ptr _schema;
shared_ptr<storage_proxy> _proxy;
@@ -3499,13 +3502,14 @@ private:
protected:
future<> make_mutation_data_requests(lw_shared_ptr<query::read_command> cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) {
return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout] (gms::inet_address ep) {
return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> f) {
return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) {
return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> f) {
try {
auto v = f.get0();
_cf->set_hit_rate(ep, std::get<1>(v));
resolver->add_mutate_data(ep, std::get<0>(std::move(v)));
++_proxy->get_stats().mutation_data_read_completed.get_ep_stat(ep);
register_request_latency(latency_clock::now() - start);
} catch(...) {
++_proxy->get_stats().mutation_data_read_errors.get_ep_stat(ep);
resolver->error(ep, std::current_exception());
@@ -3514,14 +3518,15 @@ protected:
});
}
future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) {
return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest] (gms::inet_address ep) {
return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> f) {
return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest, start = latency_clock::now()] (gms::inet_address ep) {
return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> f) {
try {
auto v = f.get0();
_cf->set_hit_rate(ep, std::get<1>(v));
resolver->add_data(ep, std::get<0>(std::move(v)));
++_proxy->get_stats().data_read_completed.get_ep_stat(ep);
_used_targets.push_back(ep);
register_request_latency(latency_clock::now() - start);
} catch(...) {
++_proxy->get_stats().data_read_errors.get_ep_stat(ep);
resolver->error(ep, std::current_exception());
@@ -3530,14 +3535,15 @@ protected:
});
}
future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) {
return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout] (gms::inet_address ep) {
return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep] (future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>> f) {
return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) {
return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start] (future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>> f) {
try {
auto v = f.get0();
_cf->set_hit_rate(ep, std::get<2>(v));
resolver->add_digest(ep, std::get<0>(v), std::get<1>(v));
++_proxy->get_stats().digest_read_completed.get_ep_stat(ep);
_used_targets.push_back(ep);
register_request_latency(latency_clock::now() - start);
} catch(...) {
++_proxy->get_stats().digest_read_errors.get_ep_stat(ep);
resolver->error(ep, std::current_exception());
@@ -3729,6 +3735,23 @@ public:
lw_shared_ptr<column_family>& get_cf() {
return _cf;
}
// Maximum latency of a successful request made to a replica (over all requests that finished up to this point).
// Example usage: gathering latency statistics for deciding on invoking speculative retries.
std::optional<latency_clock::duration> max_request_latency() const {
if (_max_request_latency == NO_LATENCY) {
return std::nullopt;
}
return _max_request_latency;
}
private:
void register_request_latency(latency_clock::duration d) {
_max_request_latency = std::max(_max_request_latency, d);
}
static constexpr latency_clock::duration NO_LATENCY{-1};
latency_clock::duration _max_request_latency{NO_LATENCY};
};
class never_speculating_read_executor : public abstract_read_executor {
@@ -3994,16 +4017,17 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
std::pair<::shared_ptr<abstract_read_executor>, dht::token_range>& executor_and_token_range) {
auto& rex = std::get<0>(executor_and_token_range);
auto& token_range = std::get<1>(executor_and_token_range);
utils::latency_counter lc;
lc.start();
return rex->execute(timeout).then_wrapped([p = std::move(p), lc, rex, used_replicas, token_range = std::move(token_range), tmptr] (
return rex->execute(timeout).then_wrapped([p = std::move(p), rex, used_replicas, token_range = std::move(token_range), tmptr] (
future<foreign_ptr<lw_shared_ptr<query::result>>> f) mutable {
if (!f.failed()) {
used_replicas->emplace(std::move(token_range), endpoints_to_replica_ids(*tmptr, rex->used_targets()));
auto latency = rex->max_request_latency();
if (latency) {
rex->get_cf()->add_coordinator_read_latency(*latency);
}
}
if (lc.is_start()) {
rex->get_cf()->add_coordinator_read_latency(lc.stop().latency());
}
return std::move(f);
});
}, std::move(merger));