Revert "Merge 'cql3: select_statement: coroutinize indexed_table_select_statement::do_execute_base_query()' from Avi Kivity"
This reverts commitdf8e1da8b2, reversing changes made to4ff204c028. It causes a crash in debug mode on aarch64 (likely a coroutine miscompile). Fixes #11809.
This commit is contained in:
@@ -655,58 +655,68 @@ indexed_table_select_statement::do_execute_base_query(
|
||||
auto cmd = prepare_command_for_base_query(qp, options, state, now, bool(paging_state));
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
|
||||
query::result_merger merger(cmd->get_row_limit(), query::max_partitions);
|
||||
std::vector<primary_key> keys = std::move(primary_keys);
|
||||
std::vector<primary_key>::iterator key_it(keys.begin());
|
||||
size_t previous_result_size = 0;
|
||||
size_t next_iteration_size = 0;
|
||||
struct base_query_state {
|
||||
query::result_merger merger;
|
||||
std::vector<primary_key> primary_keys;
|
||||
std::vector<primary_key>::iterator current_primary_key;
|
||||
size_t previous_result_size = 0;
|
||||
size_t next_iteration_size = 0;
|
||||
base_query_state(uint64_t row_limit, std::vector<primary_key>&& keys)
|
||||
: merger(row_limit, query::max_partitions)
|
||||
, primary_keys(std::move(keys))
|
||||
, current_primary_key(primary_keys.begin())
|
||||
{}
|
||||
base_query_state(base_query_state&&) = default;
|
||||
base_query_state(const base_query_state&) = delete;
|
||||
};
|
||||
|
||||
base_query_state query_state{cmd->get_row_limit(), std::move(primary_keys)};
|
||||
const bool is_paged = bool(paging_state);
|
||||
while (key_it != keys.end()) {
|
||||
// Starting with 1 key, we check if the result was a short read, and if not,
|
||||
// we continue exponentially, asking for 2x more key than before
|
||||
auto already_done = std::distance(keys.begin(), key_it);
|
||||
// If the previous result already provided 1MB worth of data,
|
||||
// stop increasing the number of fetched partitions
|
||||
if (previous_result_size < query::result_memory_limiter::maximum_result_size) {
|
||||
next_iteration_size = already_done + 1;
|
||||
}
|
||||
next_iteration_size = std::min<size_t>({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency});
|
||||
auto key_it_end = key_it + next_iteration_size;
|
||||
auto command = ::make_lw_shared<query::read_command>(*cmd);
|
||||
|
||||
query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions);
|
||||
coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>> rresult = co_await utils::result_map_reduce(key_it, key_it_end, coroutine::lambda([&] (auto& key)
|
||||
-> future<coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>>> {
|
||||
return do_with(std::move(query_state), [this, is_paged, &qp, &state, &options, cmd, timeout] (auto&& query_state) {
|
||||
auto &merger = query_state.merger;
|
||||
auto &keys = query_state.primary_keys;
|
||||
auto &key_it = query_state.current_primary_key;
|
||||
auto &previous_result_size = query_state.previous_result_size;
|
||||
auto &next_iteration_size = query_state.next_iteration_size;
|
||||
return utils::result_repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &qp, &state, &options, cmd, timeout]() {
|
||||
// Starting with 1 key, we check if the result was a short read, and if not,
|
||||
// we continue exponentially, asking for 2x more key than before
|
||||
auto already_done = std::distance(keys.begin(), key_it);
|
||||
// If the previous result already provided 1MB worth of data,
|
||||
// stop increasing the number of fetched partitions
|
||||
if (previous_result_size < query::result_memory_limiter::maximum_result_size) {
|
||||
next_iteration_size = already_done + 1;
|
||||
}
|
||||
next_iteration_size = std::min<size_t>({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency});
|
||||
auto key_it_end = key_it + next_iteration_size;
|
||||
auto command = ::make_lw_shared<query::read_command>(*cmd);
|
||||
// for each partition, read just one clustering row (TODO: can
|
||||
// get all needed rows of one partition at once.)
|
||||
command->slice._row_ranges.clear();
|
||||
if (key.clustering) {
|
||||
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
|
||||
}
|
||||
coordinator_result<service::storage_proxy::coordinator_query_result> rqr
|
||||
= co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
|
||||
if (!rqr.has_value()) {
|
||||
co_return std::move(rqr).as_failure();
|
||||
}
|
||||
co_return std::move(rqr.value().query_result);
|
||||
}), std::move(oneshot_merger));
|
||||
if (!rresult.has_value()) {
|
||||
co_return std::move(rresult).as_failure();
|
||||
}
|
||||
auto& result = rresult.value();
|
||||
auto is_short_read = result->is_short_read();
|
||||
// Results larger than 1MB should be shipped to the client immediately
|
||||
const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size;
|
||||
previous_result_size = result->buf().size();
|
||||
merger(std::move(result));
|
||||
key_it = key_it_end;
|
||||
if (is_short_read || page_limit_reached) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
co_return value_type(merger.get(), std::move(cmd));
|
||||
|
||||
query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions);
|
||||
return utils::result_map_reduce(key_it, key_it_end, [this, &qp, &state, &options, cmd, timeout] (auto& key) {
|
||||
auto command = ::make_lw_shared<query::read_command>(*cmd);
|
||||
// for each partition, read just one clustering row (TODO: can
|
||||
// get all needed rows of one partition at once.)
|
||||
command->slice._row_ranges.clear();
|
||||
if (key.clustering) {
|
||||
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
|
||||
}
|
||||
return qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
|
||||
.then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) -> coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>> {
|
||||
return std::move(qr.query_result);
|
||||
}));
|
||||
}, std::move(oneshot_merger)).then(utils::result_wrap([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr<lw_shared_ptr<query::result>> result) -> coordinator_result<stop_iteration> {
|
||||
auto is_short_read = result->is_short_read();
|
||||
// Results larger than 1MB should be shipped to the client immediately
|
||||
const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size;
|
||||
previous_result_size = result->buf().size();
|
||||
merger(std::move(result));
|
||||
key_it = key_it_end;
|
||||
return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached);
|
||||
}));
|
||||
}).then(utils::result_wrap([&merger, cmd] () mutable {
|
||||
return make_ready_future<coordinator_result<value_type>>(value_type(merger.get(), std::move(cmd)));
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
|
||||
Reference in New Issue
Block a user