diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 1dc837ef31..8ddc2b6d62 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -655,68 +655,58 @@ indexed_table_select_statement::do_execute_base_query( auto cmd = prepare_command_for_base_query(qp, options, state, now, bool(paging_state)); auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options); - struct base_query_state { - query::result_merger merger; - std::vector primary_keys; - std::vector::iterator current_primary_key; - size_t previous_result_size = 0; - size_t next_iteration_size = 0; - base_query_state(uint64_t row_limit, std::vector&& keys) - : merger(row_limit, query::max_partitions) - , primary_keys(std::move(keys)) - , current_primary_key(primary_keys.begin()) - {} - base_query_state(base_query_state&&) = default; - base_query_state(const base_query_state&) = delete; - }; + query::result_merger merger(cmd->get_row_limit(), query::max_partitions); + std::vector keys = std::move(primary_keys); + std::vector::iterator key_it(keys.begin()); + size_t previous_result_size = 0; + size_t next_iteration_size = 0; - base_query_state query_state{cmd->get_row_limit(), std::move(primary_keys)}; const bool is_paged = bool(paging_state); - return do_with(std::move(query_state), [this, is_paged, &qp, &state, &options, cmd, timeout] (auto&& query_state) { - auto &merger = query_state.merger; - auto &keys = query_state.primary_keys; - auto &key_it = query_state.current_primary_key; - auto &previous_result_size = query_state.previous_result_size; - auto &next_iteration_size = query_state.next_iteration_size; - return utils::result_repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &qp, &state, &options, cmd, timeout]() { - // Starting with 1 key, we check if the result was a short read, and if not, - // we continue exponentially, asking for 2x more key than before - auto already_done = std::distance(keys.begin(), key_it); - // If the previous result already provided 1MB worth of data, - // stop increasing the number of fetched partitions - if (previous_result_size < query::result_memory_limiter::maximum_result_size) { - next_iteration_size = already_done + 1; - } - next_iteration_size = std::min({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency}); - auto key_it_end = key_it + next_iteration_size; - auto command = ::make_lw_shared(*cmd); + while (key_it != keys.end()) { + // Starting with 1 key, we check if the result was a short read, and if not, + // we continue exponentially, asking for 2x more key than before + auto already_done = std::distance(keys.begin(), key_it); + // If the previous result already provided 1MB worth of data, + // stop increasing the number of fetched partitions + if (previous_result_size < query::result_memory_limiter::maximum_result_size) { + next_iteration_size = already_done + 1; + } + next_iteration_size = std::min({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency}); + auto key_it_end = key_it + next_iteration_size; + auto command = ::make_lw_shared(*cmd); - query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions); - return utils::result_map_reduce(key_it, key_it_end, [this, &qp, &state, &options, cmd, timeout] (auto& key) { - auto command = ::make_lw_shared(*cmd); - // for each partition, read just one clustering row (TODO: can - // get all needed rows of one partition at once.) - command->slice._row_ranges.clear(); - if (key.clustering) { - command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering)); - } - return qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) - .then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) -> coordinator_result>> { - return std::move(qr.query_result); - })); - }, std::move(oneshot_merger)).then(utils::result_wrap([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) -> coordinator_result { - auto is_short_read = result->is_short_read(); - // Results larger than 1MB should be shipped to the client immediately - const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size; - previous_result_size = result->buf().size(); - merger(std::move(result)); - key_it = key_it_end; - return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached); - })); - }).then(utils::result_wrap([&merger, cmd] () mutable { - return make_ready_future>(value_type(merger.get(), std::move(cmd))); - })); - }); + query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions); + coordinator_result>> rresult = co_await utils::result_map_reduce(key_it, key_it_end, coroutine::lambda([&] (auto& key) + -> future>>> { + auto command = ::make_lw_shared(*cmd); + // for each partition, read just one clustering row (TODO: can + // get all needed rows of one partition at once.) + command->slice._row_ranges.clear(); + if (key.clustering) { + command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering)); + } + coordinator_result rqr + = co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}); + if (!rqr.has_value()) { + co_return std::move(rqr).as_failure(); + } + co_return std::move(rqr.value().query_result); + }), std::move(oneshot_merger)); + if (!rresult.has_value()) { + co_return std::move(rresult).as_failure(); + } + auto& result = rresult.value(); + auto is_short_read = result->is_short_read(); + // Results larger than 1MB should be shipped to the client immediately + const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size; + previous_result_size = result->buf().size(); + merger(std::move(result)); + key_it = key_it_end; + if (is_short_read || page_limit_reached) { + break; + } + } + co_return value_type(merger.get(), std::move(cmd)); } future>