mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
storage_proxy: remove excess continuations around abstract_read_executor::make_requests()
abstract_read_executor::make_requests() calls make_{data,digest}_request(),
which loop over endpoints in a parallel_for_each(), then collects the
result of the parallel_for_each()es with when_all_succeed(), then
a handle_execption() (or discard_result() in related callers). The
caller of make_requests then attaches a finally() block to keep `this`
alive, and discards the remaining future.
So, a lot of continuations are generated to merge the results, all in
order to keep a reference count alive.
Remove those excess continuations by having individual make_*_request()
variants elevate the reference count themselves. They all already have
a continuation to uncorporate the result into the executor, all they need
is an extra shared_from_this() call. The parallel_for_each() loops
are converted to regular for loops.
Note even a local request that hits cache ends up with a non-ready future
due to an execution_stage for replica access, so these continuations
generate reactor tasks.
perf_simple_query reports:
before: median 203905.19 tps ( 87.1 allocs/op, 20.1 tasks/op, 50860 insns/op)
after: median 214646.89 tps ( 81.1 allocs/op, 15.1 tasks/op, 48604 insns/op)
This commit is contained in:
@@ -3450,7 +3450,7 @@ public:
|
||||
return _used_targets;
|
||||
}
|
||||
|
||||
private:
|
||||
protected:
|
||||
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> make_mutation_data_request(lw_shared_ptr<query::read_command> cmd, gms::inet_address ep, clock_type::time_point timeout) {
|
||||
++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(ep);
|
||||
if (fbu::is_me(ep)) {
|
||||
@@ -3499,11 +3499,11 @@ private:
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
future<> make_mutation_data_requests(lw_shared_ptr<query::read_command> cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) {
|
||||
return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) {
|
||||
return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> f) {
|
||||
void make_mutation_data_requests(lw_shared_ptr<query::read_command> cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) {
|
||||
auto start = latency_clock::now();
|
||||
for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) {
|
||||
// Waited on indirectly, shared_from_this keeps `this` alive
|
||||
(void)make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> f) {
|
||||
try {
|
||||
auto v = f.get0();
|
||||
_cf->set_hit_rate(ep, std::get<1>(v));
|
||||
@@ -3515,11 +3515,13 @@ protected:
|
||||
resolver->error(ep, std::current_exception());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) {
|
||||
return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest, start = latency_clock::now()] (gms::inet_address ep) {
|
||||
return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> f) {
|
||||
void make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) {
|
||||
auto start = latency_clock::now();
|
||||
for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) {
|
||||
// Waited on indirectly, shared_from_this keeps `this` alive
|
||||
(void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> f) {
|
||||
try {
|
||||
auto v = f.get0();
|
||||
_cf->set_hit_rate(ep, std::get<1>(v));
|
||||
@@ -3532,11 +3534,13 @@ protected:
|
||||
resolver->error(ep, std::current_exception());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) {
|
||||
return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) {
|
||||
return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start] (future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>> f) {
|
||||
void make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) {
|
||||
auto start = latency_clock::now();
|
||||
for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) {
|
||||
// Waited on indirectly, shared_from_this keeps `this` alive
|
||||
(void)make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>> f) {
|
||||
try {
|
||||
auto v = f.get0();
|
||||
_cf->set_hit_rate(ep, std::get<2>(v));
|
||||
@@ -3549,14 +3553,13 @@ protected:
|
||||
resolver->error(ep, std::current_exception());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
virtual future<> make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) {
|
||||
virtual void make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) {
|
||||
resolver->add_wait_targets(_targets.size());
|
||||
auto want_digest = _targets.size() > 1;
|
||||
auto f_data = futurize_invoke([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); });
|
||||
auto f_digest = futurize_invoke([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); });
|
||||
return when_all_succeed(std::move(f_data), std::move(f_digest)).discard_result().handle_exception([] (auto&&) { });
|
||||
make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest);
|
||||
make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout);
|
||||
}
|
||||
virtual void got_cl() {}
|
||||
uint64_t original_row_limit() const {
|
||||
@@ -3575,7 +3578,7 @@ protected:
|
||||
auto exec = shared_from_this();
|
||||
|
||||
// Waited on indirectly.
|
||||
(void)make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{});
|
||||
make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout);
|
||||
|
||||
// Waited on indirectly.
|
||||
(void)data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) {
|
||||
@@ -3673,10 +3676,7 @@ public:
|
||||
db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
// Waited on indirectly.
|
||||
(void)make_requests(digest_resolver, timeout).finally([exec]() {
|
||||
// hold on to executor until all queries are complete
|
||||
});
|
||||
make_requests(digest_resolver, timeout);
|
||||
|
||||
// Waited on indirectly.
|
||||
(void)digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<digest_read_result> f) mutable {
|
||||
@@ -3767,12 +3767,12 @@ public:
|
||||
class always_speculating_read_executor : public abstract_read_executor {
|
||||
public:
|
||||
using abstract_read_executor::abstract_read_executor;
|
||||
virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) {
|
||||
virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) {
|
||||
resolver->add_wait_targets(_targets.size());
|
||||
// FIXME: consider disabling for CL=*ONE
|
||||
bool want_digest = true;
|
||||
return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest),
|
||||
make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout)).discard_result();
|
||||
make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest);
|
||||
make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3781,7 +3781,7 @@ class speculating_read_executor : public abstract_read_executor {
|
||||
timer<storage_proxy::clock_type> _speculate_timer;
|
||||
public:
|
||||
using abstract_read_executor::abstract_read_executor;
|
||||
virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) {
|
||||
virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) {
|
||||
_speculate_timer.set_callback([this, resolver, timeout] {
|
||||
if (!resolver->is_completed()) { // at the time the callback runs request may be completed already
|
||||
resolver->add_wait_targets(1); // we send one more request so wait for it too
|
||||
@@ -3789,14 +3789,13 @@ public:
|
||||
auto send_request = [&] (bool has_data) {
|
||||
if (has_data) {
|
||||
_proxy->get_stats().speculative_digest_reads++;
|
||||
return make_digest_requests(resolver, _targets.end() - 1, _targets.end(), timeout);
|
||||
make_digest_requests(resolver, _targets.end() - 1, _targets.end(), timeout);
|
||||
} else {
|
||||
_proxy->get_stats().speculative_data_reads++;
|
||||
return make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true);
|
||||
make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true);
|
||||
}
|
||||
};
|
||||
// Waited on indirectly.
|
||||
(void)send_request(resolver->has_data()).finally([exec = shared_from_this()]{});
|
||||
send_request(resolver->has_data());
|
||||
}
|
||||
});
|
||||
auto& sr = _schema->speculative_retry();
|
||||
@@ -3814,13 +3813,13 @@ public:
|
||||
// We're hitting additional targets for read repair. Since our "extra" replica is the least-
|
||||
// preferred by the snitch, we do an extra data read to start with against a replica more
|
||||
// likely to reply; better to let RR fail than the entire query.
|
||||
return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest),
|
||||
make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout)).discard_result();
|
||||
make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest);
|
||||
make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout);
|
||||
} else {
|
||||
// not doing read repair; all replies are important, so it doesn't matter which nodes we
|
||||
// perform data reads against vs digest.
|
||||
return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest),
|
||||
make_digest_requests(resolver, _targets.begin() + 1, _targets.end() - 1, timeout)).discard_result();
|
||||
make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest);
|
||||
make_digest_requests(resolver, _targets.begin() + 1, _targets.end() - 1, timeout);
|
||||
}
|
||||
}
|
||||
virtual void got_cl() override {
|
||||
|
||||
Reference in New Issue
Block a user