From 10e75bc363bd87a0864ae033edfe6d5ae61af3be Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 10 Jun 2021 17:47:57 +0300 Subject: [PATCH] storage_proxy: remove excess continuations around abstract_read_executor::make_requests() abstract_read_executor::make_requests() calls make_{data,digest}_request(), which loop over endpoints in a parallel_for_each(), then collects the result of the parallel_for_each()es with when_all_succeed(), then a handle_execption() (or discard_result() in related callers). The caller of make_requests then attaches a finally() block to keep `this` alive, and discards the remaining future. So, a lot of continuations are generated to merge the results, all in order to keep a reference count alive. Remove those excess continuations by having individual make_*_request() variants elevate the reference count themselves. They all already have a continuation to uncorporate the result into the executor, all they need is an extra shared_from_this() call. The parallel_for_each() loops are converted to regular for loops. Note even a local request that hits cache ends up with a non-ready future due to an execution_stage for replica access, so these continuations generate reactor tasks. perf_simple_query reports: before: median 203905.19 tps ( 87.1 allocs/op, 20.1 tasks/op, 50860 insns/op) after: median 214646.89 tps ( 81.1 allocs/op, 15.1 tasks/op, 48604 insns/op) --- service/storage_proxy.cc | 71 ++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 2f66c3db2e..bdf4ea5eed 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3450,7 +3450,7 @@ public: return _used_targets; } -private: +protected: future>, cache_temperature>> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep, clock_type::time_point timeout) { ++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(ep); if (fbu::is_me(ep)) { @@ -3499,11 +3499,11 @@ private: }); } } - -protected: - future<> make_mutation_data_requests(lw_shared_ptr cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { - return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { + void make_mutation_data_requests(lw_shared_ptr cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { + auto start = latency_clock::now(); + for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) { + // Waited on indirectly, shared_from_this keeps `this` alive + (void)make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); @@ -3515,11 +3515,13 @@ protected: resolver->error(ep, std::current_exception()); } }); - }); + } } - future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest, start = latency_clock::now()] (gms::inet_address ep) { - return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { + void make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) { + auto start = latency_clock::now(); + for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) { + // Waited on indirectly, shared_from_this keeps `this` alive + (void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); @@ -3532,11 +3534,13 @@ protected: resolver->error(ep, std::current_exception()); } }); - }); + } } - future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { - return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start] (future> f) { + void make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { + auto start = latency_clock::now(); + for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) { + // Waited on indirectly, shared_from_this keeps `this` alive + (void)make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<2>(v)); @@ -3549,14 +3553,13 @@ protected: resolver->error(ep, std::current_exception()); } }); - }); + } } - virtual future<> make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) { + virtual void make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) { resolver->add_wait_targets(_targets.size()); auto want_digest = _targets.size() > 1; - auto f_data = futurize_invoke([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); }); - auto f_digest = futurize_invoke([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); }); - return when_all_succeed(std::move(f_data), std::move(f_digest)).discard_result().handle_exception([] (auto&&) { }); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); } virtual void got_cl() {} uint64_t original_row_limit() const { @@ -3575,7 +3578,7 @@ protected: auto exec = shared_from_this(); // Waited on indirectly. - (void)make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{}); + make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout); // Waited on indirectly. (void)data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { @@ -3673,10 +3676,7 @@ public: db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout); auto exec = shared_from_this(); - // Waited on indirectly. - (void)make_requests(digest_resolver, timeout).finally([exec]() { - // hold on to executor until all queries are complete - }); + make_requests(digest_resolver, timeout); // Waited on indirectly. (void)digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future f) mutable { @@ -3767,12 +3767,12 @@ public: class always_speculating_read_executor : public abstract_read_executor { public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { + virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { resolver->add_wait_targets(_targets.size()); // FIXME: consider disabling for CL=*ONE bool want_digest = true; - return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest), - make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout)).discard_result(); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout); } }; @@ -3781,7 +3781,7 @@ class speculating_read_executor : public abstract_read_executor { timer _speculate_timer; public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { + virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { _speculate_timer.set_callback([this, resolver, timeout] { if (!resolver->is_completed()) { // at the time the callback runs request may be completed already resolver->add_wait_targets(1); // we send one more request so wait for it too @@ -3789,14 +3789,13 @@ public: auto send_request = [&] (bool has_data) { if (has_data) { _proxy->get_stats().speculative_digest_reads++; - return make_digest_requests(resolver, _targets.end() - 1, _targets.end(), timeout); + make_digest_requests(resolver, _targets.end() - 1, _targets.end(), timeout); } else { _proxy->get_stats().speculative_data_reads++; - return make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true); + make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true); } }; - // Waited on indirectly. - (void)send_request(resolver->has_data()).finally([exec = shared_from_this()]{}); + send_request(resolver->has_data()); } }); auto& sr = _schema->speculative_retry(); @@ -3814,13 +3813,13 @@ public: // We're hitting additional targets for read repair. Since our "extra" replica is the least- // preferred by the snitch, we do an extra data read to start with against a replica more // likely to reply; better to let RR fail than the entire query. - return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest), - make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout)).discard_result(); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout); } else { // not doing read repair; all replies are important, so it doesn't matter which nodes we // perform data reads against vs digest. - return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest), - make_digest_requests(resolver, _targets.begin() + 1, _targets.end() - 1, timeout)).discard_result(); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 1, _targets.end() - 1, timeout); } } virtual void got_cl() override {