select_statement: return exceptions as errors in execute_without_checking_exception_message

Modifies the remaining logic of execute_without... (apart from the
do_execute call) so that the result-aware versions of storage_proxy's
methods are called and failed results are converted to
result_message::exception.
This commit is contained in:
Piotr Dulikowski
2022-02-17 01:20:43 +01:00
parent 5106c60cd0
commit c5bcfee28f

View File

@@ -757,25 +757,25 @@ select_statement::execute_without_checking_exception_message(query_processor& qp
return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &qp, &state, &options, cmd, timeout](auto& prs) {
assert(cmd->partition_limit == query::max_partitions);
query::result_merger merger(cmd->get_row_limit() * prs.size(), query::max_partitions);
return map_reduce(prs.begin(), prs.end(), [this, &qp, &state, &options, cmd, timeout] (auto& pr) {
return utils::result_map_reduce(prs.begin(), prs.end(), [this, &qp, &state, &options, cmd, timeout] (auto& pr) {
dht::partition_range_vector prange { pr };
auto command = ::make_lw_shared<query::read_command>(*cmd);
return qp.proxy().query(_schema,
return qp.proxy().query_result(_schema,
command,
std::move(prange),
options.get_consistency(),
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}).then([] (service::storage_proxy::coordinator_query_result qr) {
return std::move(qr.query_result);
});
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}).then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) {
return make_ready_future<coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(qr.query_result));
}));
}, std::move(merger));
}).then([this, &options, now, cmd] (auto result) {
}).then(wrap_result_to_error_message([this, &options, now, cmd] (auto result) {
return this->process_results(std::move(result), cmd, options, now);
});
}));
} else {
return qp.proxy().query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
return qp.proxy().query_result(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then(wrap_result_to_error_message([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
return this->process_results(std::move(qr.query_result), cmd, options, now);
});
}));
}
}