table: query, mutation_query: close querier when done

Make sure to close the querier and subsequently its reader before
destroying it (unless it was moved).

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-02-10 13:33:33 +02:00
parent 8b8c721431
commit 320cb67b08
2 changed files with 19 additions and 1 deletions

View File

@@ -1943,12 +1943,21 @@ table::query(schema_ptr s,
: query::data_querier(as_mutation_source(), s, class_config.semaphore.make_permit(s.get(), "data-query"), range, qs.cmd.slice,
service::get_local_sstable_query_read_priority(), trace_state);
std::exception_ptr ex;
try {
co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, timeout,
class_config.max_memory_for_unlimited_query);
if (q.are_limits_reached() || qs.builder.is_short_read()) {
cache_ctx.insert(std::move(q), std::move(trace_state));
}
} catch (...) {
ex = std::current_exception();
}
co_await q.close();
if (ex) {
std::rethrow_exception(std::move(ex));
}
}
co_return make_lw_shared<query::result>(qs.builder.build());
@@ -1973,14 +1982,21 @@ table::mutation_query(schema_ptr s,
: query::mutation_querier(as_mutation_source(), s, class_config.semaphore.make_permit(s.get(), "mutation-query"), range, cmd.slice,
service::get_local_sstable_query_read_priority(), trace_state);
std::exception_ptr ex;
try {
auto rrb = reconcilable_result_builder(*s, cmd.slice, std::move(accounter));
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, timeout, class_config.max_memory_for_unlimited_query);
if (q.are_limits_reached() || r.is_short_read()) {
cache_ctx.insert(std::move(q), std::move(trace_state));
}
co_await q.close();
co_return r;
} catch (...) {
ex = std::current_exception();
}
co_await q.close();
std::rethrow_exception(std::move(ex));
}
mutation_source

View File

@@ -90,6 +90,7 @@ static reconcilable_result mutation_query(schema_ptr s, const mutation_source& s
uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time) {
auto querier = query::mutation_querier(source, s, tests::make_permit(), range, slice, service::get_local_sstable_query_read_priority(), {});
auto close_querier = deferred_close(querier);
auto rrb = reconcilable_result_builder(*s, slice, make_accounter());
return querier.consume_page(std::move(rrb), row_limit, partition_limit, query_time, db::no_timeout,
query::max_result_size(std::numeric_limits<uint64_t>::max())).get();
@@ -538,6 +539,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
static void data_query(schema_ptr s, const mutation_source& source, const dht::partition_range& range, const query::partition_slice& slice,
query::result::builder& builder) {
auto querier = query::data_querier(source, s, tests::make_permit(), range, slice, service::get_local_sstable_query_read_priority(), {});
auto close_querier = deferred_close(querier);
auto qrb = query_result_builder(*s, builder);
querier.consume_page(std::move(qrb), std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), db::no_timeout,
query::max_result_size(std::numeric_limits<uint64_t>::max())).get();