diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 8ddc2b6d62..1dc837ef31 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -655,58 +655,68 @@ indexed_table_select_statement::do_execute_base_query( auto cmd = prepare_command_for_base_query(qp, options, state, now, bool(paging_state)); auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options); - query::result_merger merger(cmd->get_row_limit(), query::max_partitions); - std::vector keys = std::move(primary_keys); - std::vector::iterator key_it(keys.begin()); - size_t previous_result_size = 0; - size_t next_iteration_size = 0; + struct base_query_state { + query::result_merger merger; + std::vector primary_keys; + std::vector::iterator current_primary_key; + size_t previous_result_size = 0; + size_t next_iteration_size = 0; + base_query_state(uint64_t row_limit, std::vector&& keys) + : merger(row_limit, query::max_partitions) + , primary_keys(std::move(keys)) + , current_primary_key(primary_keys.begin()) + {} + base_query_state(base_query_state&&) = default; + base_query_state(const base_query_state&) = delete; + }; + base_query_state query_state{cmd->get_row_limit(), std::move(primary_keys)}; const bool is_paged = bool(paging_state); - while (key_it != keys.end()) { - // Starting with 1 key, we check if the result was a short read, and if not, - // we continue exponentially, asking for 2x more key than before - auto already_done = std::distance(keys.begin(), key_it); - // If the previous result already provided 1MB worth of data, - // stop increasing the number of fetched partitions - if (previous_result_size < query::result_memory_limiter::maximum_result_size) { - next_iteration_size = already_done + 1; - } - next_iteration_size = std::min({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency}); - auto key_it_end = key_it + next_iteration_size; - auto command = ::make_lw_shared(*cmd); - - query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions); - coordinator_result>> rresult = co_await utils::result_map_reduce(key_it, key_it_end, coroutine::lambda([&] (auto& key) - -> future>>> { + return do_with(std::move(query_state), [this, is_paged, &qp, &state, &options, cmd, timeout] (auto&& query_state) { + auto &merger = query_state.merger; + auto &keys = query_state.primary_keys; + auto &key_it = query_state.current_primary_key; + auto &previous_result_size = query_state.previous_result_size; + auto &next_iteration_size = query_state.next_iteration_size; + return utils::result_repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &qp, &state, &options, cmd, timeout]() { + // Starting with 1 key, we check if the result was a short read, and if not, + // we continue exponentially, asking for 2x more key than before + auto already_done = std::distance(keys.begin(), key_it); + // If the previous result already provided 1MB worth of data, + // stop increasing the number of fetched partitions + if (previous_result_size < query::result_memory_limiter::maximum_result_size) { + next_iteration_size = already_done + 1; + } + next_iteration_size = std::min({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency}); + auto key_it_end = key_it + next_iteration_size; auto command = ::make_lw_shared(*cmd); - // for each partition, read just one clustering row (TODO: can - // get all needed rows of one partition at once.) - command->slice._row_ranges.clear(); - if (key.clustering) { - command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering)); - } - coordinator_result rqr - = co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}); - if (!rqr.has_value()) { - co_return std::move(rqr).as_failure(); - } - co_return std::move(rqr.value().query_result); - }), std::move(oneshot_merger)); - if (!rresult.has_value()) { - co_return std::move(rresult).as_failure(); - } - auto& result = rresult.value(); - auto is_short_read = result->is_short_read(); - // Results larger than 1MB should be shipped to the client immediately - const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size; - previous_result_size = result->buf().size(); - merger(std::move(result)); - key_it = key_it_end; - if (is_short_read || page_limit_reached) { - break; - } - } - co_return value_type(merger.get(), std::move(cmd)); + + query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions); + return utils::result_map_reduce(key_it, key_it_end, [this, &qp, &state, &options, cmd, timeout] (auto& key) { + auto command = ::make_lw_shared(*cmd); + // for each partition, read just one clustering row (TODO: can + // get all needed rows of one partition at once.) + command->slice._row_ranges.clear(); + if (key.clustering) { + command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering)); + } + return qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) + .then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) -> coordinator_result>> { + return std::move(qr.query_result); + })); + }, std::move(oneshot_merger)).then(utils::result_wrap([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) -> coordinator_result { + auto is_short_read = result->is_short_read(); + // Results larger than 1MB should be shipped to the client immediately + const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size; + previous_result_size = result->buf().size(); + merger(std::move(result)); + key_it = key_it_end; + return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached); + })); + }).then(utils::result_wrap([&merger, cmd] () mutable { + return make_ready_future>(value_type(merger.get(), std::move(cmd))); + })); + }); } future>