diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 2f66c3db2e..bdf4ea5eed 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3450,7 +3450,7 @@ public: return _used_targets; } -private: +protected: future>, cache_temperature>> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep, clock_type::time_point timeout) { ++_proxy->get_stats().mutation_data_read_attempts.get_ep_stat(ep); if (fbu::is_me(ep)) { @@ -3499,11 +3499,11 @@ private: }); } } - -protected: - future<> make_mutation_data_requests(lw_shared_ptr cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, &cmd, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { - return make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { + void make_mutation_data_requests(lw_shared_ptr cmd, data_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { + auto start = latency_clock::now(); + for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) { + // Waited on indirectly, shared_from_this keeps `this` alive + (void)make_mutation_data_request(cmd, ep, timeout).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); @@ -3515,11 +3515,13 @@ protected: resolver->error(ep, std::current_exception()); } }); - }); + } } - future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, want_digest, start = latency_clock::now()] (gms::inet_address ep) { - return make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start] (future>, cache_temperature>> f) { + void make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout, bool want_digest) { + auto start = latency_clock::now(); + for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) { + // Waited on indirectly, shared_from_this keeps `this` alive + (void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future>, cache_temperature>> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<1>(v)); @@ -3532,11 +3534,13 @@ protected: resolver->error(ep, std::current_exception()); } }); - }); + } } - future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { - return parallel_for_each(begin, end, [this, resolver = std::move(resolver), timeout, start = latency_clock::now()] (gms::inet_address ep) { - return make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start] (future> f) { + void make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end, clock_type::time_point timeout) { + auto start = latency_clock::now(); + for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) { + // Waited on indirectly, shared_from_this keeps `this` alive + (void)make_digest_request(ep, timeout).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future> f) { try { auto v = f.get0(); _cf->set_hit_rate(ep, std::get<2>(v)); @@ -3549,14 +3553,13 @@ protected: resolver->error(ep, std::current_exception()); } }); - }); + } } - virtual future<> make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) { + virtual void make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) { resolver->add_wait_targets(_targets.size()); auto want_digest = _targets.size() > 1; - auto f_data = futurize_invoke([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); }); - auto f_digest = futurize_invoke([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); }); - return when_all_succeed(std::move(f_data), std::move(f_digest)).discard_result().handle_exception([] (auto&&) { }); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); } virtual void got_cl() {} uint64_t original_row_limit() const { @@ -3575,7 +3578,7 @@ protected: auto exec = shared_from_this(); // Waited on indirectly. - (void)make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{}); + make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout); // Waited on indirectly. (void)data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { @@ -3673,10 +3676,7 @@ public: db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout); auto exec = shared_from_this(); - // Waited on indirectly. - (void)make_requests(digest_resolver, timeout).finally([exec]() { - // hold on to executor until all queries are complete - }); + make_requests(digest_resolver, timeout); // Waited on indirectly. (void)digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future f) mutable { @@ -3767,12 +3767,12 @@ public: class always_speculating_read_executor : public abstract_read_executor { public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { + virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { resolver->add_wait_targets(_targets.size()); // FIXME: consider disabling for CL=*ONE bool want_digest = true; - return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest), - make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout)).discard_result(); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout); } }; @@ -3781,7 +3781,7 @@ class speculating_read_executor : public abstract_read_executor { timer _speculate_timer; public: using abstract_read_executor::abstract_read_executor; - virtual future<> make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { + virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) { _speculate_timer.set_callback([this, resolver, timeout] { if (!resolver->is_completed()) { // at the time the callback runs request may be completed already resolver->add_wait_targets(1); // we send one more request so wait for it too @@ -3789,14 +3789,13 @@ public: auto send_request = [&] (bool has_data) { if (has_data) { _proxy->get_stats().speculative_digest_reads++; - return make_digest_requests(resolver, _targets.end() - 1, _targets.end(), timeout); + make_digest_requests(resolver, _targets.end() - 1, _targets.end(), timeout); } else { _proxy->get_stats().speculative_data_reads++; - return make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true); + make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true); } }; - // Waited on indirectly. - (void)send_request(resolver->has_data()).finally([exec = shared_from_this()]{}); + send_request(resolver->has_data()); } }); auto& sr = _schema->speculative_retry(); @@ -3814,13 +3813,13 @@ public: // We're hitting additional targets for read repair. Since our "extra" replica is the least- // preferred by the snitch, we do an extra data read to start with against a replica more // likely to reply; better to let RR fail than the entire query. - return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest), - make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout)).discard_result(); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 2, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 2, _targets.end(), timeout); } else { // not doing read repair; all replies are important, so it doesn't matter which nodes we // perform data reads against vs digest. - return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest), - make_digest_requests(resolver, _targets.begin() + 1, _targets.end() - 1, timeout)).discard_result(); + make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); + make_digest_requests(resolver, _targets.begin() + 1, _targets.end() - 1, timeout); } } virtual void got_cl() override {