diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 12c35f0896..ccf3e513aa 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -757,25 +757,25 @@ select_statement::execute_without_checking_exception_message(query_processor& qp return do_with(std::forward(partition_ranges), [this, &qp, &state, &options, cmd, timeout](auto& prs) { assert(cmd->partition_limit == query::max_partitions); query::result_merger merger(cmd->get_row_limit() * prs.size(), query::max_partitions); - return map_reduce(prs.begin(), prs.end(), [this, &qp, &state, &options, cmd, timeout] (auto& pr) { + return utils::result_map_reduce(prs.begin(), prs.end(), [this, &qp, &state, &options, cmd, timeout] (auto& pr) { dht::partition_range_vector prange { pr }; auto command = ::make_lw_shared(*cmd); - return qp.proxy().query(_schema, + return qp.proxy().query_result(_schema, command, std::move(prange), options.get_consistency(), - {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}).then([] (service::storage_proxy::coordinator_query_result qr) { - return std::move(qr.query_result); - }); + {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}).then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) { + return make_ready_future>>>(std::move(qr.query_result)); + })); }, std::move(merger)); - }).then([this, &options, now, cmd] (auto result) { + }).then(wrap_result_to_error_message([this, &options, now, cmd] (auto result) { return this->process_results(std::move(result), cmd, options, now); - }); + })); } else { - return qp.proxy().query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) - .then([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) { + return qp.proxy().query_result(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) + .then(wrap_result_to_error_message([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) { return this->process_results(std::move(qr.query_result), cmd, options, now); - }); + })); } }