diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e984547584..70dc49596e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1302,6 +1302,9 @@ public: void add_wait_targets(size_t targets_count) { _targets_count += targets_count; } + bool is_completed() { + return _digest_results.size() == _targets_count; + } }; class data_read_resolver : public abstract_read_resolver { @@ -1595,11 +1598,13 @@ public: using abstract_read_executor::abstract_read_executor; virtual future<> make_requests(digest_resolver_ptr resolver) { _speculate_timer.set_callback([this, resolver] { - resolver->add_wait_targets(1); // we send one more request so wait for it too - future<> f = resolver->has_data() ? - make_digest_requests(resolver, _targets.end() - 1, _targets.end()) : - make_data_requests(resolver, _targets.end() - 1, _targets.end()); - f.finally([exec = shared_from_this()]{}); + if (!resolver->is_completed()) { // at the time the callback runs request may be completed already + resolver->add_wait_targets(1); // we send one more request so wait for it too + future<> f = resolver->has_data() ? + make_digest_requests(resolver, _targets.end() - 1, _targets.end()) : + make_data_requests(resolver, _targets.end() - 1, _targets.end()); + f.finally([exec = shared_from_this()]{}); + } }); // FIXME: the timeout should come from previous latency statistics for a partition auto timeout = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(get_local_storage_proxy().get_db().local().get_config().read_request_timeout_in_ms()/2);